1use std::sync::Arc;
16use std::time::Duration;
17
18use cache::{build_fundamental_cache_registry, with_default_composite_cache_registry};
19use catalog::information_extension::DistributedInformationExtension;
20use catalog::kvbackend::{CachedKvBackendBuilder, KvBackendCatalogManager, MetaKvBackend};
21use clap::Parser;
22use client::client_manager::NodeClients;
23use common_base::Plugins;
24use common_config::Configurable;
25use common_grpc::channel_manager::ChannelConfig;
26use common_meta::cache::{CacheRegistryBuilder, LayeredCacheRegistryBuilder};
27use common_meta::heartbeat::handler::invalidate_table_cache::InvalidateCacheHandler;
28use common_meta::heartbeat::handler::parse_mailbox_message::ParseMailboxMessageHandler;
29use common_meta::heartbeat::handler::HandlerGroupExecutor;
30use common_meta::key::flow::FlowMetadataManager;
31use common_meta::key::TableMetadataManager;
32use common_telemetry::info;
33use common_telemetry::logging::TracingOptions;
34use common_version::{short_version, version};
35use flow::{
36 get_flow_auth_options, FlownodeBuilder, FlownodeInstance, FlownodeServiceBuilder,
37 FrontendClient, FrontendInvoker,
38};
39use meta_client::{MetaClientOptions, MetaClientType};
40use snafu::{ensure, OptionExt, ResultExt};
41use tracing_appender::non_blocking::WorkerGuard;
42
43use crate::error::{
44 BuildCacheRegistrySnafu, InitMetadataSnafu, LoadLayeredConfigSnafu, MetaClientInitSnafu,
45 MissingConfigSnafu, Result, ShutdownFlownodeSnafu, StartFlownodeSnafu,
46};
47use crate::options::{GlobalOptions, GreptimeOptions};
48use crate::{log_versions, App};
49
50pub const APP_NAME: &str = "greptime-flownode";
51
52type FlownodeOptions = GreptimeOptions<flow::FlownodeOptions>;
53
54pub struct Instance {
55 flownode: FlownodeInstance,
56
57 _guard: Vec<WorkerGuard>,
59}
60
61impl Instance {
62 pub fn new(flownode: FlownodeInstance, guard: Vec<WorkerGuard>) -> Self {
63 Self {
64 flownode,
65 _guard: guard,
66 }
67 }
68
69 pub fn flownode(&self) -> &FlownodeInstance {
70 &self.flownode
71 }
72
73 pub fn flownode_mut(&mut self) -> &mut FlownodeInstance {
75 &mut self.flownode
76 }
77}
78
79#[async_trait::async_trait]
80impl App for Instance {
81 fn name(&self) -> &str {
82 APP_NAME
83 }
84
85 async fn start(&mut self) -> Result<()> {
86 plugins::start_flownode_plugins(self.flownode.flow_engine().plugins().clone())
87 .await
88 .context(StartFlownodeSnafu)?;
89
90 self.flownode.start().await.context(StartFlownodeSnafu)
91 }
92
93 async fn stop(&mut self) -> Result<()> {
94 self.flownode
95 .shutdown()
96 .await
97 .context(ShutdownFlownodeSnafu)
98 }
99}
100
101#[derive(Parser)]
102pub struct Command {
103 #[clap(subcommand)]
104 subcmd: SubCommand,
105}
106
107impl Command {
108 pub async fn build(&self, opts: FlownodeOptions) -> Result<Instance> {
109 self.subcmd.build(opts).await
110 }
111
112 pub fn load_options(&self, global_options: &GlobalOptions) -> Result<FlownodeOptions> {
113 match &self.subcmd {
114 SubCommand::Start(cmd) => cmd.load_options(global_options),
115 }
116 }
117}
118
119#[derive(Parser)]
120enum SubCommand {
121 Start(StartCommand),
122}
123
124impl SubCommand {
125 async fn build(&self, opts: FlownodeOptions) -> Result<Instance> {
126 match self {
127 SubCommand::Start(cmd) => cmd.build(opts).await,
128 }
129 }
130}
131
132#[derive(Debug, Parser, Default)]
133struct StartCommand {
134 #[clap(long)]
136 node_id: Option<u64>,
137 #[clap(long, alias = "rpc-addr")]
139 rpc_bind_addr: Option<String>,
140 #[clap(long, alias = "rpc-hostname")]
144 rpc_server_addr: Option<String>,
145 #[clap(long, value_delimiter = ',', num_args = 1..)]
147 metasrv_addrs: Option<Vec<String>>,
148 #[clap(short, long)]
150 config_file: Option<String>,
151 #[clap(long, default_value = "GREPTIMEDB_FLOWNODE")]
153 env_prefix: String,
154 #[clap(long)]
155 http_addr: Option<String>,
156 #[clap(long)]
158 http_timeout: Option<u64>,
159 #[clap(long)]
161 user_provider: Option<String>,
162}
163
164impl StartCommand {
165 fn load_options(&self, global_options: &GlobalOptions) -> Result<FlownodeOptions> {
166 let mut opts = FlownodeOptions::load_layered_options(
167 self.config_file.as_deref(),
168 self.env_prefix.as_ref(),
169 )
170 .context(LoadLayeredConfigSnafu)?;
171
172 self.merge_with_cli_options(global_options, &mut opts)?;
173
174 Ok(opts)
175 }
176
177 fn merge_with_cli_options(
179 &self,
180 global_options: &GlobalOptions,
181 opts: &mut FlownodeOptions,
182 ) -> Result<()> {
183 let opts = &mut opts.component;
184
185 if let Some(dir) = &global_options.log_dir {
186 opts.logging.dir.clone_from(dir);
187 }
188
189 if global_options.log_level.is_some() {
190 opts.logging.level.clone_from(&global_options.log_level);
191 }
192
193 opts.tracing = TracingOptions {
194 #[cfg(feature = "tokio-console")]
195 tokio_console_addr: global_options.tokio_console_addr.clone(),
196 };
197
198 if let Some(addr) = &self.rpc_bind_addr {
199 opts.grpc.bind_addr.clone_from(addr);
200 }
201
202 if let Some(server_addr) = &self.rpc_server_addr {
203 opts.grpc.server_addr.clone_from(server_addr);
204 }
205
206 if let Some(node_id) = self.node_id {
207 opts.node_id = Some(node_id);
208 }
209
210 if let Some(metasrv_addrs) = &self.metasrv_addrs {
211 opts.meta_client
212 .get_or_insert_with(MetaClientOptions::default)
213 .metasrv_addrs
214 .clone_from(metasrv_addrs);
215 }
216
217 if let Some(http_addr) = &self.http_addr {
218 opts.http.addr.clone_from(http_addr);
219 }
220
221 if let Some(http_timeout) = self.http_timeout {
222 opts.http.timeout = Duration::from_secs(http_timeout);
223 }
224
225 if let Some(user_provider) = &self.user_provider {
226 opts.user_provider = Some(user_provider.clone());
227 }
228
229 ensure!(
230 opts.node_id.is_some(),
231 MissingConfigSnafu {
232 msg: "Missing node id option"
233 }
234 );
235
236 Ok(())
237 }
238
239 async fn build(&self, opts: FlownodeOptions) -> Result<Instance> {
240 common_runtime::init_global_runtimes(&opts.runtime);
241
242 let guard = common_telemetry::init_global_logging(
243 APP_NAME,
244 &opts.component.logging,
245 &opts.component.tracing,
246 opts.component.node_id.map(|x| x.to_string()),
247 None,
248 );
249 log_versions(version(), short_version(), APP_NAME);
250
251 info!("Flownode start command: {:#?}", self);
252 info!("Flownode options: {:#?}", opts);
253
254 let plugin_opts = opts.plugins;
255 let mut opts = opts.component;
256 opts.grpc.detect_server_addr();
257
258 let mut plugins = Plugins::new();
259 plugins::setup_flownode_plugins(&mut plugins, &plugin_opts, &opts)
260 .await
261 .context(StartFlownodeSnafu)?;
262
263 let member_id = opts
264 .node_id
265 .context(MissingConfigSnafu { msg: "'node_id'" })?;
266
267 let meta_config = opts.meta_client.as_ref().context(MissingConfigSnafu {
268 msg: "'meta_client_options'",
269 })?;
270
271 let meta_client = meta_client::create_meta_client(
272 MetaClientType::Flownode { member_id },
273 meta_config,
274 None,
275 )
276 .await
277 .context(MetaClientInitSnafu)?;
278
279 let cache_max_capacity = meta_config.metadata_cache_max_capacity;
280 let cache_ttl = meta_config.metadata_cache_ttl;
281 let cache_tti = meta_config.metadata_cache_tti;
282
283 let cached_meta_backend =
285 CachedKvBackendBuilder::new(Arc::new(MetaKvBackend::new(meta_client.clone())))
286 .cache_max_capacity(cache_max_capacity)
287 .cache_ttl(cache_ttl)
288 .cache_tti(cache_tti)
289 .build();
290 let cached_meta_backend = Arc::new(cached_meta_backend);
291
292 let layered_cache_builder = LayeredCacheRegistryBuilder::default().add_cache_registry(
294 CacheRegistryBuilder::default()
295 .add_cache(cached_meta_backend.clone())
296 .build(),
297 );
298 let fundamental_cache_registry =
299 build_fundamental_cache_registry(Arc::new(MetaKvBackend::new(meta_client.clone())));
300 let layered_cache_registry = Arc::new(
301 with_default_composite_cache_registry(
302 layered_cache_builder.add_cache_registry(fundamental_cache_registry),
303 )
304 .context(BuildCacheRegistrySnafu)?
305 .build(),
306 );
307
308 let information_extension =
309 Arc::new(DistributedInformationExtension::new(meta_client.clone()));
310 let catalog_manager = KvBackendCatalogManager::new(
311 information_extension,
312 cached_meta_backend.clone(),
313 layered_cache_registry.clone(),
314 None,
315 );
316
317 let table_metadata_manager =
318 Arc::new(TableMetadataManager::new(cached_meta_backend.clone()));
319 table_metadata_manager
320 .init()
321 .await
322 .context(InitMetadataSnafu)?;
323
324 let executor = HandlerGroupExecutor::new(vec![
325 Arc::new(ParseMailboxMessageHandler),
326 Arc::new(InvalidateCacheHandler::new(layered_cache_registry.clone())),
327 ]);
328
329 let heartbeat_task = flow::heartbeat::HeartbeatTask::new(
330 &opts,
331 meta_client.clone(),
332 opts.heartbeat.clone(),
333 Arc::new(executor),
334 );
335
336 let flow_metadata_manager = Arc::new(FlowMetadataManager::new(cached_meta_backend.clone()));
337 let flow_auth_header = get_flow_auth_options(&opts).context(StartFlownodeSnafu)?;
338 let frontend_client =
339 FrontendClient::from_meta_client(meta_client.clone(), flow_auth_header);
340 let flownode_builder = FlownodeBuilder::new(
341 opts.clone(),
342 plugins,
343 table_metadata_manager,
344 catalog_manager.clone(),
345 flow_metadata_manager,
346 Arc::new(frontend_client),
347 )
348 .with_heartbeat_task(heartbeat_task);
349
350 let mut flownode = flownode_builder.build().await.context(StartFlownodeSnafu)?;
351 let services = FlownodeServiceBuilder::new(&opts)
352 .with_grpc_server(flownode.flownode_server().clone())
353 .enable_http_service()
354 .build()
355 .context(StartFlownodeSnafu)?;
356 flownode.setup_services(services);
357 let flownode = flownode;
358
359 let channel_config = ChannelConfig {
362 timeout: None,
363 ..Default::default()
364 };
365 let client = Arc::new(NodeClients::new(channel_config));
366
367 let invoker = FrontendInvoker::build_from(
368 flownode.flow_engine().streaming_engine(),
369 catalog_manager.clone(),
370 cached_meta_backend.clone(),
371 layered_cache_registry.clone(),
372 meta_client.clone(),
373 client,
374 )
375 .await
376 .context(StartFlownodeSnafu)?;
377 flownode
378 .flow_engine()
379 .streaming_engine()
380 .set_frontend_invoker(invoker)
382 .await;
383
384 Ok(Instance::new(flownode, guard))
385 }
386}