This post is an extension of Tutorial 12 from Hortonworks (original here), which shows how to use Apache Flume to consume entries from a log file and put them into HDFS.
One of the problems that I see with the Hortonworks sandbox tutorials (and don’t get me wrong, I think they are great) is the assumption that you already have data loaded into your cluster, or they demonstrate an unrealistic way of loading data into your cluster – uploading a csv file through your web browser. One of the exceptions to this is tutorial 12, which shows how to use Apache Flume to monitor a log file and insert the contents into HDFS.
In this post I’m going to further extend the original tutorial to show how to use Apache Flume to read log entries from a RabbitMQ queue.
Apache Flume is described by the folk at Hortonworks as:
Apache™ Flume is a distributed, reliable, and available service for efficiently collecting, aggregating, and moving large amounts of streaming data into the Hadoop Distributed File System (HDFS). It has a simple and flexible architecture based on streaming data flows; and is robust and fault tolerant with tunable reliability mechanisms for failover and recovery.
…continue reading about Apache Flume over on the Hortonworks website.
Overview
In this article I will cover off the following:
- Installation and Configuration of Flume
- Generating fake server logs into RabbitMQ
To follow along you will need to:
- Download tutorial files
- Download and configure the Hortonworks Sandbox
- Have a RabbitMQ broker running, and accessible to the sandbox
- Have Python 2.7 installed
Installing Flume
WIth the Sandbox up and running, press Alt and F5 to bring up the login screen. You can login using the default credentials:
login: root password: hadoop
After you’ve logged in type:
yum install -y flume
You should now see the installation progressing until it says Complete!
For more details on installation take a look at Tutorial 12 from Hortonworks.
Using the flume.conf file that is part of my tutorial files, follow the instructions to upload it into the sandbox from the tutorial. Before uploading the file, you should check that the RabbitMQ configuration matches your system:
sandbox.sources.rabbitmq_source1.hostname = 192.168.56.65 sandbox.sources.rabbitmq_source1.queuename = logs sandbox.sources.rabbitmq_source1.username = guest sandbox.sources.rabbitmq_source1.password = guest sandbox.sources.rabbitmq_source1.port = 5672 sandbox.sources.rabbitmq_source1.virtualhost = logs
You shouldn’t need to change anything else.
For Flume to be able to consume from a RabbitMQ queue I created a new plugins directory and then upload the Flume-ng RabbitMQ library.
Creating the required directories can be done from the Sandbox console with the following command:
mkdir /usr/lib/flume/plugins.d mkdir /usr/lib/flume/plugins.d/flume-rabbitmq mkdir /usr/lib/flume/plugins.d/flume-rabbitmq/lib
Once these directories have been created, upload the flume-rabbitmq-channel-1.0-SNAPSHOT.jar file into the lib directory.
Starting Flume
From the Sandbox console, execute the following command
flume-ng agent -c /etc/flume/conf -f /etc/flume/conf/flume.conf -n sandbox
Generate server logs into RabbitMQ
To generate log entries I took the original python script (which appended entries to the end of a log file), and modified it to publish log entries to RabbitMQ.
To run the python script you will need to follow the instructions on the RabbitMQ site to install the pika client library (see details on the RabbitMQ website).
The script is setup to connect to a broker on the localhost into a virtual host called “logs”. You will need to make sure that the virtual host exists.
You can start the script by running:
python.exe c:\path\to\generate_logs.py
When this is started the script will declare an exchange and queue and then start publishing log entries.
You can see that everything is running by going over the RabbitMQ Management console.
The incoming messages are the ones being generated by the Python script and the deliver / get and ack messages are the ones being consumed by Flume.
Setting up HCatalog
The following command (from the original tutorial) can be used to create the HCatalog table (make sure you only enter it only on a single line):
hcat -e “CREATE TABLE firewall_logs (time STRING, ip STRING, country STRING, status STRING) ROW FORMAT DELIMITED FIELDS TERMINATED BY ‘|’ LOCATION ‘/flume/rab_events’;”
You should now be able to browse this table from the web interface.
To do some analysis on this data you can now follow steps 5 and 6 from the original tutorial.
What does “/flume/rab_events” mean in the hcat command, ie., how does Flume consume the RabbitMQ messages from the “logs” queue.
Also, can you post the updates to generate_logs.py
Hi Jack,
“/flume/rab_events” is the path on hdfs to the events that flume has consumed, this *should* be in the conf file for flume, but it appears to be missing. I’ll have to try and dig it up and update my post. As for generate_logs.py you can get that by downloading the tutorial files (https://www.dropbox.com/s/8kjfblngcnf55qo/flume.zip)
Hi Ken
So should flume.conf should have a new sink with a path to /flume/rab_events
# HDFS sinks
sandbox.sinks.sink_to_hdfs.type = hdfs
sandbox.sinks.sink_to_hdfs.hdfs.fileType = DataStream
sandbox.sinks.sink_to_hdfs.hdfs.path = /flume/rab_events
sandbox.sinks.sink_to_hdfs.hdfs.filePrefix = eventlog
sandbox.sinks.sink_to_hdfs.hdfs.fileSuffix = .log
sandbox.sinks.sink_to_hdfs.hdfs.batchSize = 1000
Correct. I just so happen to have that bit missing from the example in my post. Good spot!
You will also need to bind the sink to the source using a channel. Check out the sample from the original tutorial files for how to do this.
If you can’t get it to work then let me know and I’ll see if I can dig up the flume.conf file that I originally used.
Hi Ken,
Here is my entire flume.conf, am I missing anything?
sandbox.sources = eventlog
sandbox.channels = file_channel
sandbox.sinks = sink_to_hdfs
# Define / Configure source
sandbox.sources.eventlog.type = exec
sandbox.sources.eventlog.command = tail -F /var/log/eventlog-demo.log
sandbox.sources.eventlog.restart = true
sandbox.sources.eventlog.batchSize = 1000
#sandbox.sources.eventlog.type = seq
# Define Rabbit Source
sandbox.sources.rabbitmq_source1.hostname = 192.168.56.101
sandbox.sources.rabbitmq_source1.queuename = logs
sandbox.sources.rabbitmq_source1.username = guest
sandbox.sources.rabbitmq_source1.password = guest
sandbox.sources.rabbitmq_source1.port = 5672
sandbox.sources.rabbitmq_source1.virtualhost = logs
# HDFS sinks
sandbox.sinks.sink_to_hdfs.type = hdfs
sandbox.sinks.sink_to_hdfs.hdfs.fileType = DataStream
sandbox.sinks.sink_to_hdfs.hdfs.path = /flume/events
sandbox.sinks.sink_to_hdfs.hdfs.filePrefix = eventlog
sandbox.sinks.sink_to_hdfs.hdfs.fileSuffix = .log
sandbox.sinks.sink_to_hdfs.hdfs.batchSize = 1000
# Use a channel which buffers events in memory
sandbox.channels.file_channel.type = file
sandbox.channels.file_channel.checkpointDir = /var/flume/checkpoint
sandbox.channels.file_channel.dataDirs = /var/flume/data
# Bind the source and sink to the channel
sandbox.sources.eventlog.channels = file_channel
sandbox.sinks.sink_to_hdfs.channel = file_channel
You will need to change it so that you are binding the rabbitmq source to the sink. You can also remove the eventlog source, so something more what I’ve pasted below. If this doesn’t work let me know and I’ll have a look around for the one I originally used.
sandbox.sources = rabbitmq_source1
sandbox.channels = file_channel
sandbox.sinks = sink_to_hdfs
# Define Rabbit Source
sandbox.sources.rabbitmq_source1.hostname = 192.168.56.101
sandbox.sources.rabbitmq_source1.queuename = logs
sandbox.sources.rabbitmq_source1.username = guest
sandbox.sources.rabbitmq_source1.password = guest
sandbox.sources.rabbitmq_source1.port = 5672
sandbox.sources.rabbitmq_source1.virtualhost = logs
# HDFS sinks
sandbox.sinks.sink_to_hdfs.type = hdfs
sandbox.sinks.sink_to_hdfs.hdfs.fileType = DataStream
sandbox.sinks.sink_to_hdfs.hdfs.path = /flume/events
sandbox.sinks.sink_to_hdfs.hdfs.filePrefix = eventlog
sandbox.sinks.sink_to_hdfs.hdfs.fileSuffix = .log
sandbox.sinks.sink_to_hdfs.hdfs.batchSize = 1000
# Use a channel which buffers events in memory
sandbox.channels.file_channel.type = file
sandbox.channels.file_channel.checkpointDir = /var/flume/checkpoint
sandbox.channels.file_channel.dataDirs = /var/flume/data
# Bind the source and sink to the channel
sandbox.sources.rabbitmq_source1.channels = file_channel
sandbox.sinks.sink_to_hdfs.channel = file_channel
Hi Ken
I am now getting the below exception when running the python script. I created the “logs” virtual host on RabbitMQ but I am wondering if there is a permission issue trying to access it
2014-10-13T06:21:26
Traceback (most recent call last):
File “generate_logs_rabbit.py”, line 112, in
main()
File “generate_logs_rabbit.py”, line 97, in main
connection = pika.BlockingConnection(pika.ConnectionParameters(host=’localhost’,virtual_host=’logs’))
File “/usr/lib/python2.6/site-packages/pika/adapters/base_connection.py”, line 61, in __init__
super(BaseConnection, self).__init__(parameters, on_open_callback)
File “/usr/lib/python2.6/site-packages/pika/connection.py”, line 513, in __init__
self._connect()
File “/usr/lib/python2.6/site-packages/pika/connection.py”, line 804, in _connect
self._adapter_connect()
File “/usr/lib/python2.6/site-packages/pika/adapters/blocking_connection.py”, line 146, in _adapter_connect
self.process_data_events()
File “/usr/lib/python2.6/site-packages/pika/adapters/blocking_connection.py”, line 88, in process_data_events
if self._handle_read():
File “/usr/lib/python2.6/site-packages/pika/adapters/blocking_connection.py”, line 184, in _handle_read
super(BlockingConnection, self)._handle_read()
File “/usr/lib/python2.6/site-packages/pika/adapters/base_connection.py”, line 300, in _handle_read
return self._handle_error(error)
File “/usr/lib/python2.6/site-packages/pika/adapters/base_connection.py”, line 264, in _handle_error
self._handle_disconnect()
File “/usr/lib/python2.6/site-packages/pika/adapters/blocking_connection.py”, line 181, in _handle_disconnect
self._on_connection_closed(None, True)
File “/usr/lib/python2.6/site-packages/pika/adapters/blocking_connection.py”, line 235, in _on_connection_closed
raise exceptions.AMQPConnectionError(*self.closing)
pika.exceptions.AMQPConnectionError: (0, ”)
Make sure that the user you have setup has permissions assigned to that virtual host
Hi Ken,
getting an exception when starting flume, so can you please send on your flume.conf as I am clearly misconfiguring the RabbitMQ/Flume sink
16 Oct 2014 04:20:59,696 WARN [conf-file-poller-0] (org.apache.flume.conf.FlumeConfiguration$AgentConfiguration.validateSources:589) – Could not configure source rabbitmq_source1 due to: Component has no type. Cannot configure. rabbitmq_source1
org.apache.flume.conf.ConfigurationException: Component has no type. Cannot configure. rabbitmq_source1
at org.apache.flume.conf.ComponentConfiguration.configure(ComponentConfiguration.java:76)
at org.apache.flume.conf.source.SourceConfiguration.configure(SourceConfiguration.java:55)
at org.apache.flume.conf.FlumeConfiguration$AgentConfiguration.validateSources(FlumeConfiguration.java:566)
at org.apache.flume.conf.FlumeConfiguration$AgentConfiguration.isValid(FlumeConfiguration.java:345)
at org.apache.flume.conf.FlumeConfiguration$AgentConfiguration.access$000(FlumeConfiguration.java:212)
at org.apache.flume.conf.FlumeConfiguration.validateConfiguration(FlumeConfiguration.java:126)
at org.apache.flume.conf.FlumeConfiguration.(FlumeConfiguration.java:108)
@
I’ll have a look for it tonight. In the mean time try adding:
sandbox.sources.rabbitmq_source1.type = org.apache.flume.source.rabbitmq.RabbitMQSource
(or check out https://github.com/jcustenborder/flume-ng-rabbitmq for more details)
Hi Jack, take a look at the comments below from Mohan, I think that should fix your problem.
Hi Ken,
I followed all your instructions and setup flume within HDP sandbox but it is not consuming messages from the queue . I don’t see any error either . Please look at my configuration
I have masked my IP. Am I doing anything wrong ?
===============================
sandbox.sources = rabbitmq_source1
sandbox.channels = file_channel
sandbox.sinks = sink_to_hdfs
# Define Rabbit Source
sandbox.sources.rabbitmq_source1.hostname =X.X.X.X
sandbox.sources.rabbitmq-source1.type = org.apache.flume.source.rabbitmq.RabbitMQSource
sandbox.sources.rabbitmq_source1.queuename = serverlogs.usage.queue
sandbox.sources.rabbitmq_source1.username = guest
sandbox.sources.rabbitmq_source1.password = guest
sandbox.sources.rabbitmq_source1.port = 55672
sandbox.sources.rabbitmq_source1.virtualhost = /
# HDFS sinks
sandbox.sinks.sink_to_hdfs.type = hdfs
sandbox.sinks.rabbitmq-sink1.type = org.apache.flume.sink.rabbitmq.RabbitMQSink
sandbox.sinks.sink_to_hdfs.hdfs.fileType = DataStream
sandbox.sinks.sink_to_hdfs.hdfs.path = /flume/rab_events
sandbox.sinks.sink_to_hdfs.hdfs.filePrefix = eventlog
sandbox.sinks.sink_to_hdfs.hdfs.fileSuffix = .log
sandbox.sinks.sink_to_hdfs.hdfs.batchSize = 1000
# Use a channel which buffers events in memory
sandbox.channels.file_channel.type = file
sandbox.channels.file_channel.checkpointDir = /var/flume/checkpoint
sandbox.channels.file_channel.dataDirs = /var/flume/data
# Bind the source and sink to the channel
sandbox.sources.rabbitmq_source1.channels = file_channel
sandbox.sinks.sink_to_hdfs.channel = file_channel
actually I am getting error with above config
23 Jan 2015 08:31:11,203 WARN [conf-file-poller-0] (org.apache.flume.conf.FlumeConfiguration$AgentConfiguration.validateSources:589) – Could not configure source rabbitmq_source1 due to: Component has no type. Cannot configure. rabbitmq_source1
org.apache.flume.conf.ConfigurationException: Component has no type. Cannot configure. rabbitmq_source1
at org.apache.flume.conf.ComponentConfiguration.configure(ComponentConfiguration.java:76)
at org.apache.flume.conf.source.SourceConfiguration.configure(SourceConfiguration.java:55)
at org.apache.flume.conf.FlumeConfiguration$AgentConfiguration.validateSources(FlumeConfiguration.java:566)
at org.apache.flume.conf.FlumeConfiguration$AgentConfiguration.isValid(FlumeConfiguration.java:345)
at org.apache.flume.conf.FlumeConfiguration$AgentConfiguration.access$000(FlumeConfiguration.java:212)
at org.apache.flume.conf.FlumeConfiguration.validateConfiguration(FlumeConfiguration.java:126)
at org.apache.flume.conf.FlumeConfiguration.(FlumeConfiguration.java:108)
at org.apache.flume.node.PropertiesFileConfigurationProvider.getFlumeConfiguration(PropertiesFileConfigurationProvider.java:193)
at org.apache.flume.node.AbstractConfigurationProvider.getConfiguration(AbstractConfigurationProvider.java:94)
at org.apache.flume.node.PollingPropertiesFileConfigurationProvider$FileWatcherRunnable.run(PollingPropertiesFileConfigurationProvider.java:140)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)
at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:304)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:178)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)
Hi Mohan, did you copy the rabbitmq jar file to the plugins directory?
yes , I did . Looks like it is not able to locate the jar at run time. I put following in flume-env.conf but it doesn’t work
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# “License”); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an “AS IS” BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# If this file is placed at FLUME_CONF_DIR/flume-env.sh, it will be sourced
# during Flume startup.
# Enviroment variables can be set here.
# export JAVA_HOME=/usr/lib/jvm/java-6-sun
# Give Flume more memory and pre-allocate, enable remote monitoring via JMX
# export JAVA_OPTS=”-Xms100m -Xmx2000m -Dcom.sun.management.jmxremote”
# Note that the Flume conf directory is always included in the classpath.
FLUME_CLASSPATH=/usr/lib/flume/plugins.d/flume-rabbitmq/lib/
# export HIVE_HOME=/usr/lib/hive
# export HCAT_HOME=/usr/lib/hive-hcatalog
I fixed it , in the environment file give full path of the jar ,instead of just directory path
FLUME_CLASSPATH=/usr/lib/flume/plugins.d/flume-rabbitmq/lib/ flume-rabbitmq-channel-1.0-SNAPSHOT.jar
Thanks Ken , your article rocks 🙂
actually in quotes
FLUME_CLASSPATH=”/usr/lib/flume/plugins.d/flume-rabbitmq/lib/ flume-rabbitmq-channel-1.0-SNAPSHOT.jar”
That’s great news, glad you managed to get it to work. I’ll update the article with your find. Cheers
Good post Ken. Thanks!
I am trying this as well but not HDP singlenode. I am using the final copy of Mohan’s conf file. Ensured the plug-in is set to FLUME_CLASSPATH. But I still get the below error.
Error:
2015-01-29 09:50:55,932 (conf-file-poller-0) [WARN – org.apache.flume.conf.FlumeConfiguration$AgentConfiguration.validateSources(FlumeConfiguration.java:571)] Removed rabbitmq_source1 due to Component has no type. Cannot configure. rabbitmq_source1
On the RabbitMQ I just have a queue to which I am publishing events. Should I be doing anything more on RabbitMQ?
Hi Ravi, It looks like the problem is related to your configuration file… specifically the source section. Can you double check that the values are correct for your environment