Sink is the last component of Apache Flume data flow, and it is used to output data into storages like local files, HDFS, ElasticSearch, etc. In this article, I will illustrate how Flume’s HDFS sink works, by analyzing its source code with diagrams.
Sink Component Lifecycle
In the previous article, we learnt that every Flume component implements LifecycleAware interface, and is started and monitored by LifecycleSupervisor. Sink component is not directly invoked by this supervisor, but wrapped in SinkRunner and SinkProcessor classes. Flume supports three different sink processors, to connect channel and sinks in different semantics. But here we only consider the DefaultSinkProcessor, that accepts only one sink, and we will skip the concept of sink group as well.

HDFS Sink Classes
HDFS sink’s source code locates in flume-hdfs-sink sub-module, and is composed of the following classes:

HDFSEventSink class implements the lifecycle methods, including configure, start, process, and stop. It maintains a list of BucketWriter, according to the output file paths, and delegates received events to them. With different implementations of HDFSWriter, BucketWriter can append data to either text file, compressed file, or sequence file.
Configure and Start
When Flume configuration file is loaded, configure method is called on every sink component. In HDFSEventSink#configure, it reads properties that are prefixed with hdfs. from the context, provides default values, and does some sanity checks. For instance, batchSize must be greater than 0, codeC must be provided when fileType is CompressedStream, etc. It also initializes a SinkCounter to provide various metrics for monitoring.
1 | public void configure(Context context) { |
SinkProcessor will invoke the HDFSEventSink#start method, in which two thread pools are created. callTimeoutPool is used by BucketWriter#callWithTimeout to limit the time that HDFS calls may take, such as FileSystem#create, or FSDataOutputStream#hflush. timedRollerPool is used to schedule a periodic task to do time-based file rolling, if rollInterval property is provided. More details will be covered in the next section.
1 | public void start() { |
Process Events
The process method contains the main logic, i.e. pull events from upstream channel and send them to HDFS. Here is the flow chart of this method.

Channel Transaction
Codes are wrapped in a channel transaction, with some exception handlings. Take Kafka channel for instance, when transaction begins, it takes events without committing the offset. Only after it successfully writes these events into HDFS, the consumed offset will be sent to Kafka. And in the next transaction, it can consume messages from the new offset.
1 | Channel channel = getChannel(); |
Find or Create BucketWriter
BucketWriter corresponds to an HDFS file, and the file path is generated from configuration. For example:
1 | a1.sinks.access_log.hdfs.path = /user/flume/access_log/dt=%Y%m%d |
The generated file paths, temporary and final, will be:
1 | /user/flume/access_log/dt=20180925/.events.hostname1.1537848761307.lzo.tmp |
Placeholders are replaced in BucketPath#escapeString. It supports three kinds of placeholders:
%{...}: replace with arbitrary header values;%[...]: currently only supports%[localhost],%[ip], and%[fqdn];%x: date time patterns, which requires atimestampentry in headers, oruseLocalTimeStampis enabled.
And the prefix and suffix is added in BucketWriter#open. counter is the timestamp when this bucket is opened or re-opened, and lzo is the default extension of the configured compression codec.
1 | String fullFileName = fileName + "." + counter; |
If no BucketWriter is associated with the file path, a new one will be created. First, it creates an HDFSWriter corresponding to the fileType config. Flume supports three kinds of writers: HDFSSequenceFile, HDFSDataStream, and HDFSCompressedDataStream. They handle the actual writing to HDFS files, and will be assigned to the new BucketWriter.
1 | bucketWriter = sfWriters.get(lookupPath); |
Append Data and Flush
Before appending data, BucketWriter will first self-check whether it is opened. If not, it will call its underlying HDFSWriter to open a new file on HDFS filesystem. Take HDFSCompressedDataStream for instance:
1 | public void open(String filePath, CompressionCodec codec) { |
Flume’s default serializerType is TEXT, i.e. BodyTextEventSerializer that simply writes the event content to the output stream.
1 | public void write(Event e) throws IOException { |
When BucketWriter is about to close or re-open, it calls sync on HDFSWrtier, which in turn calls flush on serializer and underlying output stream.
1 | public void sync() throws IOException { |
From Hadoop 0.21.0, the Syncable#sync method is divided into hflush and hsync methods. Former just flushes data out of client’s buffer, while latter guarantees data is synced to disk device. In order to handle both old and new API, Flume will use Java reflection to determine whether hflush exists, or fall back to sync. The flushOrSync method will invoke the right method.
File Rotation
In HDFS sink, files can be rotated by file size, event count, or time interval. BucketWriter#shouldRotate is called in every append:
1 | private boolean shouldRotate() { |
Time-based rolling, on the other hand, is scheduled in the previously mentioned timedRollerPool:
1 | private void open() throws IOException, InterruptedException { |
Close and Stop
In HDFSEventSink#close, it iterates every BucketWriter and calls its close method, which in turn calls its underlying HDFSWriter‘s close method. What it does is mostly like flush method, but also closes the output stream and invokes some callback functions, like removing current BucketWriter from the sfWriters hash map.
1 | public synchronized void close(boolean callCloseCallback) { |
The onCloseCallback is passed from HDFSEventSink when initializing the BucketWriter:
1 | WriterCallback closeCallback = new WriterCallback() { |
After all BucketWriters are closed, HDFSEventSink then shutdown the callTimeoutPool and timedRollerPool executer services.
1 | ExecutorService[] toShutdown = { callTimeoutPool, timedRollerPool }; |