1use std::fmt::Debug;
16use std::path::Path;
17use std::sync::Arc;
18use std::time::Duration;
19
20use cache::{build_fundamental_cache_registry, with_default_composite_cache_registry};
21use catalog::information_extension::DistributedInformationExtension;
22use catalog::kvbackend::{
23 CachedKvBackendBuilder, KvBackendCatalogManagerBuilder, new_read_only_meta_kv_backend,
24};
25use clap::Parser;
26use client::client_manager::NodeClients;
27use common_base::Plugins;
28use common_config::{Configurable, DEFAULT_DATA_HOME};
29use common_grpc::channel_manager::ChannelConfig;
30use common_meta::cache::{CacheRegistryBuilder, LayeredCacheRegistryBuilder};
31use common_meta::heartbeat::handler::HandlerGroupExecutor;
32use common_meta::heartbeat::handler::invalidate_table_cache::InvalidateCacheHandler;
33use common_meta::heartbeat::handler::parse_mailbox_message::ParseMailboxMessageHandler;
34use common_meta::key::TableMetadataManager;
35use common_meta::key::flow::FlowMetadataManager;
36use common_stat::ResourceStatImpl;
37use common_telemetry::info;
38use common_telemetry::logging::{DEFAULT_LOGGING_DIR, TracingOptions};
39use common_version::{short_version, verbose_version};
40use flow::{
41 FlownodeBuilder, FlownodeInstance, FlownodeServiceBuilder, FrontendClient, FrontendInvoker,
42};
43use meta_client::{MetaClientOptions, MetaClientType};
44use plugins::flownode::context::GrpcConfigureContext;
45use servers::addrs;
46use servers::configurator::GrpcBuilderConfiguratorRef;
47use snafu::{OptionExt, ResultExt, ensure};
48use tracing_appender::non_blocking::WorkerGuard;
49
50use crate::error::{
51 BuildCacheRegistrySnafu, LoadLayeredConfigSnafu, MetaClientInitSnafu, MissingConfigSnafu,
52 OtherSnafu, Result, ShutdownFlownodeSnafu, StartFlownodeSnafu,
53};
54use crate::options::{GlobalOptions, GreptimeOptions};
55use crate::{App, create_resource_limit_metrics, log_versions, maybe_activate_heap_profile};
56
57pub const APP_NAME: &str = "greptime-flownode";
58
59type FlownodeOptions = GreptimeOptions<flow::FlownodeOptions>;
60
61pub struct Instance {
62 flownode: FlownodeInstance,
63 _guard: Vec<WorkerGuard>,
65}
66
67impl Instance {
68 pub fn new(flownode: FlownodeInstance, guard: Vec<WorkerGuard>) -> Self {
69 Self {
70 flownode,
71 _guard: guard,
72 }
73 }
74
75 pub fn flownode(&self) -> &FlownodeInstance {
76 &self.flownode
77 }
78
79 pub fn flownode_mut(&mut self) -> &mut FlownodeInstance {
81 &mut self.flownode
82 }
83}
84
85#[async_trait::async_trait]
86impl App for Instance {
87 fn name(&self) -> &str {
88 APP_NAME
89 }
90
91 async fn start(&mut self) -> Result<()> {
92 plugins::start_flownode_plugins(&self.flownode)
93 .await
94 .context(StartFlownodeSnafu)?;
95
96 self.flownode.start().await.context(StartFlownodeSnafu)
97 }
98
99 async fn stop(&mut self) -> Result<()> {
100 self.flownode
101 .shutdown()
102 .await
103 .context(ShutdownFlownodeSnafu)
104 }
105}
106
107#[derive(Parser)]
108pub struct Command {
109 #[clap(subcommand)]
110 subcmd: SubCommand,
111}
112
113impl Command {
114 pub async fn build(&self, opts: FlownodeOptions) -> Result<Instance> {
115 self.subcmd.build(opts).await
116 }
117
118 pub fn load_options(&self, global_options: &GlobalOptions) -> Result<FlownodeOptions> {
119 match &self.subcmd {
120 SubCommand::Start(cmd) => cmd.load_options(global_options),
121 }
122 }
123}
124
125#[derive(Parser)]
126enum SubCommand {
127 Start(StartCommand),
128}
129
130impl SubCommand {
131 async fn build(&self, opts: FlownodeOptions) -> Result<Instance> {
132 match self {
133 SubCommand::Start(cmd) => cmd.build(opts).await,
134 }
135 }
136}
137
138#[derive(Debug, Parser, Default)]
139struct StartCommand {
140 #[clap(long)]
142 node_id: Option<u64>,
143 #[clap(long = "grpc-bind-addr", alias = "rpc-bind-addr", alias = "rpc-addr")]
145 grpc_bind_addr: Option<String>,
146 #[clap(
150 long = "grpc-server-addr",
151 alias = "rpc-server-addr",
152 alias = "rpc-hostname"
153 )]
154 grpc_server_addr: Option<String>,
155 #[clap(long, value_delimiter = ',', num_args = 1..)]
157 metasrv_addrs: Option<Vec<String>>,
158 #[clap(short, long)]
160 config_file: Option<String>,
161 #[clap(long, default_value = "GREPTIMEDB_FLOWNODE")]
163 env_prefix: String,
164 #[clap(long)]
165 http_addr: Option<String>,
166 #[clap(long)]
168 http_timeout: Option<u64>,
169}
170
171impl StartCommand {
172 fn load_options(&self, global_options: &GlobalOptions) -> Result<FlownodeOptions> {
173 let mut opts = FlownodeOptions::load_layered_options(
174 self.config_file.as_deref(),
175 self.env_prefix.as_ref(),
176 )
177 .context(LoadLayeredConfigSnafu)?;
178
179 self.merge_with_cli_options(global_options, &mut opts)?;
180
181 Ok(opts)
182 }
183
184 fn merge_with_cli_options(
186 &self,
187 global_options: &GlobalOptions,
188 opts: &mut FlownodeOptions,
189 ) -> Result<()> {
190 let opts = &mut opts.component;
191
192 if let Some(dir) = &global_options.log_dir {
193 opts.logging.dir.clone_from(dir);
194 }
195
196 if opts.logging.dir.is_empty() {
198 opts.logging.dir = Path::new(DEFAULT_DATA_HOME)
199 .join(DEFAULT_LOGGING_DIR)
200 .to_string_lossy()
201 .to_string();
202 }
203
204 if global_options.log_level.is_some() {
205 opts.logging.level.clone_from(&global_options.log_level);
206 }
207
208 opts.tracing = TracingOptions {
209 #[cfg(feature = "tokio-console")]
210 tokio_console_addr: global_options.tokio_console_addr.clone(),
211 };
212
213 if let Some(addr) = &self.grpc_bind_addr {
214 opts.grpc.bind_addr.clone_from(addr);
215 }
216
217 if let Some(server_addr) = &self.grpc_server_addr {
218 opts.grpc.server_addr.clone_from(server_addr);
219 }
220
221 if let Some(node_id) = self.node_id {
222 opts.node_id = Some(node_id);
223 }
224
225 if let Some(metasrv_addrs) = &self.metasrv_addrs {
226 opts.meta_client
227 .get_or_insert_with(MetaClientOptions::default)
228 .metasrv_addrs
229 .clone_from(metasrv_addrs);
230 }
231
232 if let Some(http_addr) = &self.http_addr {
233 opts.http.addr.clone_from(http_addr);
234 }
235
236 if let Some(http_timeout) = self.http_timeout {
237 opts.http.timeout = Duration::from_secs(http_timeout);
238 }
239
240 ensure!(
241 opts.node_id.is_some(),
242 MissingConfigSnafu {
243 msg: "Missing node id option"
244 }
245 );
246
247 Ok(())
248 }
249
250 async fn build(&self, opts: FlownodeOptions) -> Result<Instance> {
251 common_runtime::init_global_runtimes(&opts.runtime);
252
253 let guard = common_telemetry::init_global_logging(
254 APP_NAME,
255 &opts.component.logging,
256 &opts.component.tracing,
257 opts.component.node_id.map(|x| x.to_string()),
258 None,
259 );
260
261 log_versions(verbose_version(), short_version(), APP_NAME);
262 maybe_activate_heap_profile(&opts.component.memory);
263 create_resource_limit_metrics(APP_NAME);
264
265 info!("Flownode start command: {:#?}", self);
266 info!("Flownode options: {:#?}", opts);
267
268 let plugin_opts = opts.plugins;
269 let mut opts = opts.component;
270 opts.grpc.detect_server_addr();
271
272 let mut plugins = Plugins::new();
273 plugins::setup_flownode_plugins_pre_build(&mut plugins, &plugin_opts, &opts)
274 .await
275 .context(StartFlownodeSnafu)?;
276
277 let member_id = opts
278 .node_id
279 .context(MissingConfigSnafu { msg: "'node_id'" })?;
280
281 let meta_config = opts.meta_client.as_ref().context(MissingConfigSnafu {
282 msg: "'meta_client_options'",
283 })?;
284
285 let meta_client = meta_client::create_meta_client(
286 MetaClientType::Flownode { member_id },
287 meta_config,
288 None,
289 None,
290 )
291 .await
292 .context(MetaClientInitSnafu)?;
293
294 let cache_max_capacity = meta_config.metadata_cache_max_capacity;
295 let cache_ttl = meta_config.metadata_cache_ttl;
296 let cache_tti = meta_config.metadata_cache_tti;
297
298 let readonly_meta_backend = new_read_only_meta_kv_backend(meta_client.clone());
299
300 let cached_meta_backend = CachedKvBackendBuilder::new(readonly_meta_backend.clone())
302 .cache_max_capacity(cache_max_capacity)
303 .cache_ttl(cache_ttl)
304 .cache_tti(cache_tti)
305 .build();
306 let cached_meta_backend = Arc::new(cached_meta_backend);
307
308 let layered_cache_builder = LayeredCacheRegistryBuilder::default().add_cache_registry(
310 CacheRegistryBuilder::default()
311 .add_cache(cached_meta_backend.clone())
312 .build(),
313 );
314 let fundamental_cache_registry =
315 build_fundamental_cache_registry(readonly_meta_backend.clone());
316 let layered_cache_registry = Arc::new(
317 with_default_composite_cache_registry(
318 layered_cache_builder.add_cache_registry(fundamental_cache_registry),
319 )
320 .context(BuildCacheRegistrySnafu)?
321 .build(),
322 );
323
324 let channel_config = ChannelConfig {
327 timeout: None,
328 ..Default::default()
329 };
330 let client = Arc::new(NodeClients::new(channel_config));
331
332 let information_extension = Arc::new(DistributedInformationExtension::new(
333 meta_client.clone(),
334 client.clone(),
335 ));
336 let catalog_manager = KvBackendCatalogManagerBuilder::new(
337 information_extension,
338 cached_meta_backend.clone(),
339 layered_cache_registry.clone(),
340 )
341 .build();
342
343 let table_metadata_manager =
344 Arc::new(TableMetadataManager::new(cached_meta_backend.clone()));
345
346 let executor = HandlerGroupExecutor::new(vec![
347 Arc::new(ParseMailboxMessageHandler),
348 Arc::new(InvalidateCacheHandler::new(layered_cache_registry.clone())),
349 ]);
350
351 let mut resource_stat = ResourceStatImpl::default();
352 resource_stat.start_collect_cpu_usage();
353
354 let heartbeat_task = flow::heartbeat::HeartbeatTask::new(
355 &opts,
356 meta_client.clone(),
357 Arc::new(executor),
358 Arc::new(resource_stat),
359 );
360
361 let flow_metadata_manager = Arc::new(FlowMetadataManager::new(cached_meta_backend.clone()));
362 let frontend_client = FrontendClient::from_meta_client(
363 meta_client.clone(),
364 opts.query.clone(),
365 opts.flow.batching_mode.clone(),
366 )
367 .context(StartFlownodeSnafu)?;
368 let frontend_client = Arc::new(frontend_client);
369 let mut flownode_builder = FlownodeBuilder::new(
370 opts.clone(),
371 plugins.clone(),
372 table_metadata_manager,
373 catalog_manager.clone(),
374 flow_metadata_manager,
375 frontend_client.clone(),
376 )
377 .with_heartbeat_task(heartbeat_task);
378
379 plugins::setup_flownode_plugins_post_build(&mut plugins, &plugin_opts, &flownode_builder)
380 .await
381 .context(StartFlownodeSnafu)?;
382 flownode_builder.set_plugins(plugins.clone());
383
384 let mut flownode = flownode_builder.build().await.context(StartFlownodeSnafu)?;
385
386 let builder =
387 FlownodeServiceBuilder::grpc_server_builder(&opts, flownode.flownode_server());
388 let builder = if let Some(configurator) =
389 plugins.get::<GrpcBuilderConfiguratorRef<GrpcConfigureContext>>()
390 {
391 let context = GrpcConfigureContext {
392 kv_backend: cached_meta_backend.clone(),
393 fe_client: frontend_client.clone(),
394 flownode_id: member_id,
395 catalog_manager: catalog_manager.clone(),
396 };
397 configurator
398 .configure(builder, context)
399 .await
400 .context(OtherSnafu)?
401 } else {
402 builder
403 };
404 let grpc_server = builder.build();
405
406 let services = FlownodeServiceBuilder::new(&opts)
407 .with_grpc_server(grpc_server)
408 .enable_http_service()
409 .build()
410 .context(StartFlownodeSnafu)?;
411 flownode.setup_services(services);
412 let flownode = flownode;
413
414 let invoker = FrontendInvoker::build_from(
415 flownode.flow_engine().streaming_engine(),
416 catalog_manager.clone(),
417 cached_meta_backend.clone(),
418 layered_cache_registry.clone(),
419 meta_client.clone(),
420 client,
421 addrs::resolve_addr(&opts.grpc.bind_addr, Some(&opts.grpc.server_addr)),
422 )
423 .await
424 .context(StartFlownodeSnafu)?;
425 flownode
426 .flow_engine()
427 .streaming_engine()
428 .set_frontend_invoker(invoker)
430 .await;
431
432 Ok(Instance::new(flownode, guard))
433 }
434}
435
436#[cfg(test)]
437mod tests {
438 use clap::{CommandFactory, Parser};
439
440 use super::*;
441
442 #[test]
443 fn test_parse_grpc_cli_aliases() {
444 let command = StartCommand::try_parse_from([
445 "flownode",
446 "--grpc-bind-addr",
447 "127.0.0.1:14004",
448 "--grpc-server-addr",
449 "10.0.0.1:14004",
450 ])
451 .unwrap();
452 assert_eq!(command.grpc_bind_addr.as_deref(), Some("127.0.0.1:14004"));
453 assert_eq!(command.grpc_server_addr.as_deref(), Some("10.0.0.1:14004"));
454
455 let command = StartCommand::try_parse_from([
456 "flownode",
457 "--rpc-bind-addr",
458 "127.0.0.1:24004",
459 "--rpc-server-addr",
460 "10.0.0.2:24004",
461 ])
462 .unwrap();
463 assert_eq!(command.grpc_bind_addr.as_deref(), Some("127.0.0.1:24004"));
464 assert_eq!(command.grpc_server_addr.as_deref(), Some("10.0.0.2:24004"));
465
466 let command = StartCommand::try_parse_from([
467 "flownode",
468 "--rpc-addr",
469 "127.0.0.1:34004",
470 "--rpc-hostname",
471 "10.0.0.3:34004",
472 ])
473 .unwrap();
474 assert_eq!(command.grpc_bind_addr.as_deref(), Some("127.0.0.1:34004"));
475 assert_eq!(command.grpc_server_addr.as_deref(), Some("10.0.0.3:34004"));
476 }
477
478 #[test]
479 fn test_help_uses_grpc_option_names() {
480 let mut cmd = StartCommand::command();
481 let mut help = Vec::new();
482 cmd.write_long_help(&mut help).unwrap();
483 let help = String::from_utf8(help).unwrap();
484
485 assert!(help.contains("--grpc-bind-addr"));
486 assert!(help.contains("--grpc-server-addr"));
487 assert!(!help.contains("--rpc-bind-addr"));
488 assert!(!help.contains("--rpc-server-addr"));
489 assert!(!help.contains("--rpc-addr"));
490 assert!(!help.contains("--rpc-hostname"));
491 assert!(!help.contains("--user-provider"));
492 }
493
494 #[test]
495 fn test_user_provider_cli_option_is_removed() {
496 let command = StartCommand::try_parse_from([
497 "flownode",
498 "--node-id",
499 "14",
500 "--user-provider",
501 "static_user_provider:cmd:test=test",
502 ]);
503 assert!(command.is_err());
504 }
505}