Apache Flume is a real-time ETL tool for data warehouse platform. It consists of different types of components, and during runtime all of them are managed by Flume’s lifecycle and supervisor mechanism. This article will walk you through the source code of Flume’s component lifecycle management.
Repository Structure
Flume’s source code can be downloaded from GitHub. It’s a Maven project, so we can import it into an IDE for efficient code reading. The following is the main structure of the project:
1 | /flume-ng-node |
Application Entrance
The main entrance of Flume agent is in the org.apache.flume.node.Application class of flume-ng-node module. Following is an abridged source code:
1 | public class Application { |
The process can be illustrated as follows:
- Parse command line arguments with
commons-cli, including the Flume agent’s name, configuration method and path. - Configurations can be provided via properties file or ZooKeeper. Both provider support live-reload, i.e. we can update component settings without restarting the agent.
- File-based live-reload is implemented by using a background thread that checks the last modification time of the file.
- ZooKeeper-based live-reload is provided by Curator’s
NodeCacherecipe, which uses ZooKeeper’s watch functionality underneath.
- If live-reload is on (by default), configuration providers will add themselves into the application’s component list, and after calling
Application#start, aLifecycleSupervisorwill start the provider, and trigger the reload event to parse the configuration and load all defined components. - If live-reload is off, configuration providers will parse the file immediately and start all components, also supervised by
LifecycleSupervisor. - Finally add a JVM shutdown hook by
Runtime#addShutdownHook, which in turn invokesApplication#stopto shutdown the Flume agent.
Configuration Reload
In PollingPropertiesFileConfigurationProvider, when it detects file changes, it will invoke the AbstractConfigurationProvider#getConfiguration method to parse the configuration file into an MaterializedConfiguration instance, which contains the source, sink, and channel definitions. And then, the polling thread send an event to Application via a Guava’s EventBus instance, which effectively invokes the Application#handleConfigurationEvent method to reload all components.
1 | // Application class |
Start Components
The starting process lies in Application#startAllComponents. The method accepts a new set of components, starts the Channels first, followed by Sinks and Sources.
1 | private void startAllComponents(MaterializedConfiguration materializedConfiguration) { |
The LifecycleSupervisor manages instances that implement LifecycleAware interface. Supervisor will schedule a MonitorRunnable instance with a fixed delay (3 secs), which tries to convert a LifecycleAware instance into its desiredState, by calling LifecycleAware#start or stop.
1 | public static class MonitorRunnable implements Runnable { |
Stop Components
When JVM is shutting down, the hook invokes Application#stop, which calls LifecycleSupervisor#stop, that first shutdowns the MonitorRunnables’ executor pool, and changes all components’ desired status to STOP, waiting for them to fully shutdown.
1 | public class LifecycleSupervisor implements LifecycleAware { |
Source and Source Runner
Take KafkaSource for an instance, we shall see how agent supervises source components, and the same thing happens to sinks and channels.
1 | public class KafkaSource extends AbstractPollableSource { |
KafkaSource is a pollable source, which means it needs a runner thread to constantly poll for more data to process.
1 | public class PollableSourceRunner extends SourceRunner { |
Both AbstractPollableSource and SourceRunner are subclass of LifecycleAware, which means they have start and stop methods for supervisor to call. In this case, SourceRunner is the component that Flume agent actually supervises, and PollableSource is instantiated and managed by SourceRunner. Details lie in AbstractConfigurationProvider#loadSources:
1 | private void loadSources(Map<String, SourceRunner> sourceRunnerMap) { |