# Apache Flume – Get logs out of RabbitMQ and into HDFS

This post is an extension of Tutorial 12 from Hortonworks (original here), which shows how to use Apache Flume to consume entries from a log file and put them into HDFS.

In this post I’m going to further extend the original tutorial to show how to use Apache Flume to read log entries from a RabbitMQ queue.

Apache Flume is described by the folk at Hortonworks as:

Apache™ Flume is a distributed, reliable, and available service for efficiently collecting, aggregating, and moving large amounts of streaming data into the Hadoop Distributed File System (HDFS). It has a simple and flexible architecture based on streaming data flows; and is robust and fault tolerant with tunable reliability mechanisms for failover and recovery.

## Overview

• Installation and Configuration of Flume
• Generating fake server logs into RabbitMQ

To follow along you will need to:

## Installing Flume

WIth the Sandbox up and running, press Alt and F5 to bring up the login screen. You can login using the default credentials:

login: root
password: hadoop

After you’ve logged in type:

yum install -y flume

You should now see the installation progressing until it says Complete!

For more details on installation take a look at Tutorial 12 from Hortonworks.

Using the flume.conf file that is part of my tutorial files, follow the instructions to upload it into the sandbox from the tutorial. Before uploading the file, you should check that the RabbitMQ configuration matches your system:

sandbox.sources.rabbitmq_source1.hostname = 192.168.56.65
sandbox.sources.rabbitmq_source1.queuename = logs
sandbox.sources.rabbitmq_source1.port = 5672
sandbox.sources.rabbitmq_source1.virtualhost = logs

You shouldn’t need to change anything else.

For Flume to be able to consume from a RabbitMQ queue I created a new plugins directory and then upload the Flume-ng RabbitMQ library.

Creating the required directories can be done from the Sandbox console with the following command:

mkdir /usr/lib/flume/plugins.d
mkdir /usr/lib/flume/plugins.d/flume-rabbitmq
mkdir /usr/lib/flume/plugins.d/flume-rabbitmq/lib

Once these directories have been created, upload the flume-rabbitmq-channel-1.0-SNAPSHOT.jar file into the lib directory.

## Starting Flume

From the Sandbox console, execute the following command

flume-ng agent -c /etc/flume/conf -f /etc/flume/conf/flume.conf -n sandbox

## Generate server logs into RabbitMQ

To generate log entries I took the original python script (which appended entries to the end of a log file), and modified it to publish log entries to RabbitMQ.

To run the python script you will need to follow the instructions on the RabbitMQ site to install the pika client library (see details on the RabbitMQ website).

The script is setup to connect to a broker on the localhost into a virtual host called “logs”. You will need to make sure that the virtual host exists.

You can start the script by running:

python.exe c:\path\to\generate_logs.py

When this is started the script will declare an exchange and queue and then start publishing log entries.

You can see that everything is running by going over the RabbitMQ Management console.

The incoming messages are the ones being generated by the Python script and the deliver / get and ack messages are the ones being consumed by Flume.

Setting up HCatalog

The following command (from the original tutorial) can be used to create the HCatalog table (make sure you only enter it only on a single line):

hcat -e “CREATE TABLE firewall_logs (time STRING, ip STRING, country STRING, status STRING) ROW FORMAT DELIMITED FIELDS TERMINATED BY ‘|’ LOCATION ‘/flume/rab_events’;”

You should now be able to browse this table from the web interface.

To do some analysis on this data you can now follow steps 5 and 6 from the original tutorial.

## 21 thoughts on “Apache Flume – Get logs out of RabbitMQ and into HDFS”

1. What does “/flume/rab_events” mean in the hcat command, ie., how does Flume consume the RabbitMQ messages from the “logs” queue.
Also, can you post the updates to generate_logs.py

2. Hi Jack,
“/flume/rab_events” is the path on hdfs to the events that flume has consumed, this *should* be in the conf file for flume, but it appears to be missing. I’ll have to try and dig it up and update my post. As for generate_logs.py you can get that by downloading the tutorial files (https://www.dropbox.com/s/8kjfblngcnf55qo/flume.zip)

1. Hi Ken

So should flume.conf should have a new sink with a path to /flume/rab_events

# HDFS sinks
sandbox.sinks.sink_to_hdfs.type = hdfs
sandbox.sinks.sink_to_hdfs.hdfs.fileType = DataStream
sandbox.sinks.sink_to_hdfs.hdfs.path = /flume/rab_events
sandbox.sinks.sink_to_hdfs.hdfs.filePrefix = eventlog
sandbox.sinks.sink_to_hdfs.hdfs.fileSuffix = .log
sandbox.sinks.sink_to_hdfs.hdfs.batchSize = 1000

1. Correct. I just so happen to have that bit missing from the example in my post. Good spot!

2. You will also need to bind the sink to the source using a channel. Check out the sample from the original tutorial files for how to do this.
If you can’t get it to work then let me know and I’ll see if I can dig up the flume.conf file that I originally used.

3. Hi Ken,

Here is my entire flume.conf, am I missing anything?

sandbox.sources = eventlog
sandbox.channels = file_channel
sandbox.sinks = sink_to_hdfs

# Define / Configure source
sandbox.sources.eventlog.type = exec
sandbox.sources.eventlog.command = tail -F /var/log/eventlog-demo.log
sandbox.sources.eventlog.restart = true
sandbox.sources.eventlog.batchSize = 1000
#sandbox.sources.eventlog.type = seq

# Define Rabbit Source
sandbox.sources.rabbitmq_source1.hostname = 192.168.56.101
sandbox.sources.rabbitmq_source1.queuename = logs
sandbox.sources.rabbitmq_source1.port = 5672
sandbox.sources.rabbitmq_source1.virtualhost = logs

# HDFS sinks
sandbox.sinks.sink_to_hdfs.type = hdfs
sandbox.sinks.sink_to_hdfs.hdfs.fileType = DataStream
sandbox.sinks.sink_to_hdfs.hdfs.path = /flume/events
sandbox.sinks.sink_to_hdfs.hdfs.filePrefix = eventlog
sandbox.sinks.sink_to_hdfs.hdfs.fileSuffix = .log
sandbox.sinks.sink_to_hdfs.hdfs.batchSize = 1000

# Use a channel which buffers events in memory
sandbox.channels.file_channel.type = file
sandbox.channels.file_channel.checkpointDir = /var/flume/checkpoint

# Bind the source and sink to the channel
sandbox.sources.eventlog.channels = file_channel
sandbox.sinks.sink_to_hdfs.channel = file_channel

1. You will need to change it so that you are binding the rabbitmq source to the sink. You can also remove the eventlog source, so something more what I’ve pasted below. If this doesn’t work let me know and I’ll have a look around for the one I originally used.

sandbox.sources = rabbitmq_source1
sandbox.channels = file_channel
sandbox.sinks = sink_to_hdfs

# Define Rabbit Source
sandbox.sources.rabbitmq_source1.hostname = 192.168.56.101
sandbox.sources.rabbitmq_source1.queuename = logs
sandbox.sources.rabbitmq_source1.port = 5672
sandbox.sources.rabbitmq_source1.virtualhost = logs

# HDFS sinks
sandbox.sinks.sink_to_hdfs.type = hdfs
sandbox.sinks.sink_to_hdfs.hdfs.fileType = DataStream
sandbox.sinks.sink_to_hdfs.hdfs.path = /flume/events
sandbox.sinks.sink_to_hdfs.hdfs.filePrefix = eventlog
sandbox.sinks.sink_to_hdfs.hdfs.fileSuffix = .log
sandbox.sinks.sink_to_hdfs.hdfs.batchSize = 1000

# Use a channel which buffers events in memory
sandbox.channels.file_channel.type = file
sandbox.channels.file_channel.checkpointDir = /var/flume/checkpoint

# Bind the source and sink to the channel
sandbox.sources.rabbitmq_source1.channels = file_channel
sandbox.sinks.sink_to_hdfs.channel = file_channel

4. Hi Ken

I am now getting the below exception when running the python script. I created the “logs” virtual host on RabbitMQ but I am wondering if there is a permission issue trying to access it

2014-10-13T06:21:26
Traceback (most recent call last):
File “generate_logs_rabbit.py”, line 112, in
main()
File “generate_logs_rabbit.py”, line 97, in main
connection = pika.BlockingConnection(pika.ConnectionParameters(host=’localhost’,virtual_host=’logs’))
File “/usr/lib/python2.6/site-packages/pika/adapters/base_connection.py”, line 61, in __init__
super(BaseConnection, self).__init__(parameters, on_open_callback)
File “/usr/lib/python2.6/site-packages/pika/connection.py”, line 513, in __init__
self._connect()
File “/usr/lib/python2.6/site-packages/pika/connection.py”, line 804, in _connect
self.process_data_events()
File “/usr/lib/python2.6/site-packages/pika/adapters/blocking_connection.py”, line 88, in process_data_events
return self._handle_error(error)
File “/usr/lib/python2.6/site-packages/pika/adapters/base_connection.py”, line 264, in _handle_error
self._handle_disconnect()
File “/usr/lib/python2.6/site-packages/pika/adapters/blocking_connection.py”, line 181, in _handle_disconnect
self._on_connection_closed(None, True)
File “/usr/lib/python2.6/site-packages/pika/adapters/blocking_connection.py”, line 235, in _on_connection_closed
raise exceptions.AMQPConnectionError(*self.closing)
pika.exceptions.AMQPConnectionError: (0, ”)

1. Make sure that the user you have setup has permissions assigned to that virtual host

5. Hi Ken,

getting an exception when starting flume, so can you please send on your flume.conf as I am clearly misconfiguring the RabbitMQ/Flume sink

16 Oct 2014 04:20:59,696 WARN [conf-file-poller-0] (org.apache.flume.conf.FlumeConfiguration$AgentConfiguration.validateSources:589) – Could not configure source rabbitmq_source1 due to: Component has no type. Cannot configure. rabbitmq_source1 org.apache.flume.conf.ConfigurationException: Component has no type. Cannot configure. rabbitmq_source1 at org.apache.flume.conf.ComponentConfiguration.configure(ComponentConfiguration.java:76) at org.apache.flume.conf.source.SourceConfiguration.configure(SourceConfiguration.java:55) at org.apache.flume.conf.FlumeConfiguration$AgentConfiguration.validateSources(FlumeConfiguration.java:566)
at org.apache.flume.conf.FlumeConfiguration$AgentConfiguration.isValid(FlumeConfiguration.java:345) at org.apache.flume.conf.FlumeConfiguration$AgentConfiguration.access$000(FlumeConfiguration.java:212) at org.apache.flume.conf.FlumeConfiguration.validateConfiguration(FlumeConfiguration.java:126) at org.apache.flume.conf.FlumeConfiguration.(FlumeConfiguration.java:108) @ 1. Hi Jack, take a look at the comments below from Mohan, I think that should fix your problem. 6. Hi Ken, I followed all your instructions and setup flume within HDP sandbox but it is not consuming messages from the queue . I don’t see any error either . Please look at my configuration I have masked my IP. Am I doing anything wrong ? =============================== sandbox.sources = rabbitmq_source1 sandbox.channels = file_channel sandbox.sinks = sink_to_hdfs # Define Rabbit Source sandbox.sources.rabbitmq_source1.hostname =X.X.X.X sandbox.sources.rabbitmq-source1.type = org.apache.flume.source.rabbitmq.RabbitMQSource sandbox.sources.rabbitmq_source1.queuename = serverlogs.usage.queue sandbox.sources.rabbitmq_source1.username = guest sandbox.sources.rabbitmq_source1.password = guest sandbox.sources.rabbitmq_source1.port = 55672 sandbox.sources.rabbitmq_source1.virtualhost = / # HDFS sinks sandbox.sinks.sink_to_hdfs.type = hdfs sandbox.sinks.rabbitmq-sink1.type = org.apache.flume.sink.rabbitmq.RabbitMQSink sandbox.sinks.sink_to_hdfs.hdfs.fileType = DataStream sandbox.sinks.sink_to_hdfs.hdfs.path = /flume/rab_events sandbox.sinks.sink_to_hdfs.hdfs.filePrefix = eventlog sandbox.sinks.sink_to_hdfs.hdfs.fileSuffix = .log sandbox.sinks.sink_to_hdfs.hdfs.batchSize = 1000 # Use a channel which buffers events in memory sandbox.channels.file_channel.type = file sandbox.channels.file_channel.checkpointDir = /var/flume/checkpoint sandbox.channels.file_channel.dataDirs = /var/flume/data # Bind the source and sink to the channel sandbox.sources.rabbitmq_source1.channels = file_channel sandbox.sinks.sink_to_hdfs.channel = file_channel 1. actually I am getting error with above config 23 Jan 2015 08:31:11,203 WARN [conf-file-poller-0] (org.apache.flume.conf.FlumeConfiguration$AgentConfiguration.validateSources:589) – Could not configure source rabbitmq_source1 due to: Component has no type. Cannot configure. rabbitmq_source1
org.apache.flume.conf.ConfigurationException: Component has no type. Cannot configure. rabbitmq_source1
at org.apache.flume.conf.ComponentConfiguration.configure(ComponentConfiguration.java:76)
at org.apache.flume.conf.source.SourceConfiguration.configure(SourceConfiguration.java:55)
at org.apache.flume.conf.FlumeConfiguration$AgentConfiguration.validateSources(FlumeConfiguration.java:566) at org.apache.flume.conf.FlumeConfiguration$AgentConfiguration.isValid(FlumeConfiguration.java:345)
at org.apache.flume.conf.FlumeConfiguration$AgentConfiguration.access$000(FlumeConfiguration.java:212)
at org.apache.flume.conf.FlumeConfiguration.validateConfiguration(FlumeConfiguration.java:126)
at org.apache.flume.conf.FlumeConfiguration.(FlumeConfiguration.java:108)
at org.apache.flume.node.PropertiesFileConfigurationProvider.getFlumeConfiguration(PropertiesFileConfigurationProvider.java:193)
at org.apache.flume.node.AbstractConfigurationProvider.getConfiguration(AbstractConfigurationProvider.java:94)
at org.apache.flume.node.PollingPropertiesFileConfigurationProvider$FileWatcherRunnable.run(PollingPropertiesFileConfigurationProvider.java:140) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:178)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)

1. Hi Mohan, did you copy the rabbitmq jar file to the plugins directory?

7. yes , I did . Looks like it is not able to locate the jar at run time. I put following in flume-env.conf but it doesn’t work
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# to you under the Apache License, Version 2.0 (the
# “License”); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
#
# Unless required by applicable law or agreed to in writing, software
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and

# If this file is placed at FLUME_CONF_DIR/flume-env.sh, it will be sourced
# during Flume startup.

# Enviroment variables can be set here.

# export JAVA_HOME=/usr/lib/jvm/java-6-sun

# Give Flume more memory and pre-allocate, enable remote monitoring via JMX
# export JAVA_OPTS=”-Xms100m -Xmx2000m -Dcom.sun.management.jmxremote”

# Note that the Flume conf directory is always included in the classpath.
FLUME_CLASSPATH=/usr/lib/flume/plugins.d/flume-rabbitmq/lib/

# export HIVE_HOME=/usr/lib/hive
# export HCAT_HOME=/usr/lib/hive-hcatalog

1. I fixed it , in the environment file give full path of the jar ,instead of just directory path
FLUME_CLASSPATH=/usr/lib/flume/plugins.d/flume-rabbitmq/lib/ flume-rabbitmq-channel-1.0-SNAPSHOT.jar

Thanks Ken , your article rocks 🙂

1. actually in quotes
FLUME_CLASSPATH=”/usr/lib/flume/plugins.d/flume-rabbitmq/lib/ flume-rabbitmq-channel-1.0-SNAPSHOT.jar”

2. That’s great news, glad you managed to get it to work. I’ll update the article with your find. Cheers

8. Good post Ken. Thanks!

I am trying this as well but not HDP singlenode. I am using the final copy of Mohan’s conf file. Ensured the plug-in is set to FLUME_CLASSPATH. But I still get the below error.

Error:
2015-01-29 09:50:55,932 (conf-file-poller-0) [WARN – org.apache.flume.conf.FlumeConfiguration\$AgentConfiguration.validateSources(FlumeConfiguration.java:571)] Removed rabbitmq_source1 due to Component has no type. Cannot configure. rabbitmq_source1

On the RabbitMQ I just have a queue to which I am publishing events. Should I be doing anything more on RabbitMQ?

1. Hi Ravi, It looks like the problem is related to your configuration file… specifically the source section. Can you double check that the values are correct for your environment