Apache Flume – Get logs out of RabbitMQ and into HDFS

This post is an extension of Tutorial 12 from Hortonworks (original here), which shows how to use Apache Flume to consume entries from a log file and put them into HDFS.

One of the problems that I see with the Hortonworks sandbox tutorials (and don’t get me wrong, I think they are great) is the assumption that you already have data loaded into your cluster, or they demonstrate an unrealistic way of loading data into your cluster – uploading a csv file through your web browser. One of the exceptions to this is tutorial 12, which shows how to use Apache Flume to monitor a log file and insert the contents into HDFS.

In this post I’m going to further extend the original tutorial to show how to use Apache Flume to read log entries from a RabbitMQ queue.

Apache Flume is described by the folk at Hortonworks as:

Apache™ Flume is a distributed, reliable, and available service for efficiently collecting, aggregating, and moving large amounts of streaming data into the Hadoop Distributed File System (HDFS). It has a simple and flexible architecture based on streaming data flows; and is robust and fault tolerant with tunable reliability mechanisms for failover and recovery.

…continue reading about Apache Flume over on the Hortonworks website.

Overview

In this article I will cover off the following:

  • Installation and Configuration of Flume
  • Generating fake server logs into RabbitMQ

To follow along you will need to:

Installing Flume

WIth the Sandbox up and running, press Alt and F5 to bring up the login screen. You can login using the default credentials:

login: root
password: hadoop

After you’ve logged in type:

yum install -y flume

You should now see the installation progressing until it says Complete!

For more details on installation take a look at Tutorial 12 from Hortonworks.

Using the flume.conf file that is part of my tutorial files, follow the instructions to upload it into the sandbox from the tutorial. Before uploading the file, you should check that the RabbitMQ configuration matches your system:

sandbox.sources.rabbitmq_source1.hostname = 192.168.56.65
sandbox.sources.rabbitmq_source1.queuename = logs
sandbox.sources.rabbitmq_source1.username = guest
sandbox.sources.rabbitmq_source1.password = guest
sandbox.sources.rabbitmq_source1.port = 5672
sandbox.sources.rabbitmq_source1.virtualhost = logs

You shouldn’t need to change anything else.

For Flume to be able to consume from a RabbitMQ queue I created a new plugins directory and then upload the Flume-ng RabbitMQ library.

Creating the required directories can be done from the Sandbox console with the following command:

mkdir /usr/lib/flume/plugins.d
mkdir /usr/lib/flume/plugins.d/flume-rabbitmq
mkdir /usr/lib/flume/plugins.d/flume-rabbitmq/lib

Once these directories have been created, upload the flume-rabbitmq-channel-1.0-SNAPSHOT.jar file into the lib directory.

Starting Flume

From the Sandbox console, execute the following command

flume-ng agent -c /etc/flume/conf -f /etc/flume/conf/flume.conf -n sandbox

Generate server logs into RabbitMQ

To generate log entries I took the original python script (which appended entries to the end of a log file), and modified it to publish log entries to RabbitMQ.

To run the python script you will need to follow the instructions on the RabbitMQ site to install the pika client library (see details on the RabbitMQ website).

The script is setup to connect to a broker on the localhost into a virtual host called “logs”. You will need to make sure that the virtual host exists.

You can start the script by running:

python.exe c:\path\to\generate_logs.py

When this is started the script will declare an exchange and queue and then start publishing log entries.

You can see that everything is running by going over the RabbitMQ Management console.

3-flume-consumingThe incoming messages are the ones being generated by the Python script and the deliver / get and ack messages are the ones being consumed by Flume.

Setting up HCatalog

The following command (from the original tutorial) can be used to create the HCatalog table (make sure you only enter it only on a single line):

hcat -e “CREATE TABLE firewall_logs (time STRING, ip STRING, country STRING, status STRING) ROW FORMAT DELIMITED FIELDS TERMINATED BY ‘|’ LOCATION ‘/flume/rab_events’;”

You should now be able to browse this table from the web interface.

To do some analysis on this data you can now follow steps 5 and 6 from the original tutorial.

Advertisements

21 thoughts on “Apache Flume – Get logs out of RabbitMQ and into HDFS

  1. Jack says:

    What does “/flume/rab_events” mean in the hcat command, ie., how does Flume consume the RabbitMQ messages from the “logs” queue.
    Also, can you post the updates to generate_logs.py

    • Jack says:

      Hi Ken

      So should flume.conf should have a new sink with a path to /flume/rab_events

      # HDFS sinks
      sandbox.sinks.sink_to_hdfs.type = hdfs
      sandbox.sinks.sink_to_hdfs.hdfs.fileType = DataStream
      sandbox.sinks.sink_to_hdfs.hdfs.path = /flume/rab_events
      sandbox.sinks.sink_to_hdfs.hdfs.filePrefix = eventlog
      sandbox.sinks.sink_to_hdfs.hdfs.fileSuffix = .log
      sandbox.sinks.sink_to_hdfs.hdfs.batchSize = 1000

      • KenR says:

        You will also need to bind the sink to the source using a channel. Check out the sample from the original tutorial files for how to do this.
        If you can’t get it to work then let me know and I’ll see if I can dig up the flume.conf file that I originally used.

  2. Jack says:

    Hi Ken,

    Here is my entire flume.conf, am I missing anything?

    sandbox.sources = eventlog
    sandbox.channels = file_channel
    sandbox.sinks = sink_to_hdfs

    # Define / Configure source
    sandbox.sources.eventlog.type = exec
    sandbox.sources.eventlog.command = tail -F /var/log/eventlog-demo.log
    sandbox.sources.eventlog.restart = true
    sandbox.sources.eventlog.batchSize = 1000
    #sandbox.sources.eventlog.type = seq

    # Define Rabbit Source
    sandbox.sources.rabbitmq_source1.hostname = 192.168.56.101
    sandbox.sources.rabbitmq_source1.queuename = logs
    sandbox.sources.rabbitmq_source1.username = guest
    sandbox.sources.rabbitmq_source1.password = guest
    sandbox.sources.rabbitmq_source1.port = 5672
    sandbox.sources.rabbitmq_source1.virtualhost = logs

    # HDFS sinks
    sandbox.sinks.sink_to_hdfs.type = hdfs
    sandbox.sinks.sink_to_hdfs.hdfs.fileType = DataStream
    sandbox.sinks.sink_to_hdfs.hdfs.path = /flume/events
    sandbox.sinks.sink_to_hdfs.hdfs.filePrefix = eventlog
    sandbox.sinks.sink_to_hdfs.hdfs.fileSuffix = .log
    sandbox.sinks.sink_to_hdfs.hdfs.batchSize = 1000

    # Use a channel which buffers events in memory
    sandbox.channels.file_channel.type = file
    sandbox.channels.file_channel.checkpointDir = /var/flume/checkpoint
    sandbox.channels.file_channel.dataDirs = /var/flume/data

    # Bind the source and sink to the channel
    sandbox.sources.eventlog.channels = file_channel
    sandbox.sinks.sink_to_hdfs.channel = file_channel

    • KenR says:

      You will need to change it so that you are binding the rabbitmq source to the sink. You can also remove the eventlog source, so something more what I’ve pasted below. If this doesn’t work let me know and I’ll have a look around for the one I originally used.

      sandbox.sources = rabbitmq_source1
      sandbox.channels = file_channel
      sandbox.sinks = sink_to_hdfs

      # Define Rabbit Source
      sandbox.sources.rabbitmq_source1.hostname = 192.168.56.101
      sandbox.sources.rabbitmq_source1.queuename = logs
      sandbox.sources.rabbitmq_source1.username = guest
      sandbox.sources.rabbitmq_source1.password = guest
      sandbox.sources.rabbitmq_source1.port = 5672
      sandbox.sources.rabbitmq_source1.virtualhost = logs

      # HDFS sinks
      sandbox.sinks.sink_to_hdfs.type = hdfs
      sandbox.sinks.sink_to_hdfs.hdfs.fileType = DataStream
      sandbox.sinks.sink_to_hdfs.hdfs.path = /flume/events
      sandbox.sinks.sink_to_hdfs.hdfs.filePrefix = eventlog
      sandbox.sinks.sink_to_hdfs.hdfs.fileSuffix = .log
      sandbox.sinks.sink_to_hdfs.hdfs.batchSize = 1000

      # Use a channel which buffers events in memory
      sandbox.channels.file_channel.type = file
      sandbox.channels.file_channel.checkpointDir = /var/flume/checkpoint
      sandbox.channels.file_channel.dataDirs = /var/flume/data

      # Bind the source and sink to the channel
      sandbox.sources.rabbitmq_source1.channels = file_channel
      sandbox.sinks.sink_to_hdfs.channel = file_channel

  3. Jack says:

    Hi Ken

    I am now getting the below exception when running the python script. I created the “logs” virtual host on RabbitMQ but I am wondering if there is a permission issue trying to access it

    2014-10-13T06:21:26
    Traceback (most recent call last):
    File “generate_logs_rabbit.py”, line 112, in
    main()
    File “generate_logs_rabbit.py”, line 97, in main
    connection = pika.BlockingConnection(pika.ConnectionParameters(host=’localhost’,virtual_host=’logs’))
    File “/usr/lib/python2.6/site-packages/pika/adapters/base_connection.py”, line 61, in __init__
    super(BaseConnection, self).__init__(parameters, on_open_callback)
    File “/usr/lib/python2.6/site-packages/pika/connection.py”, line 513, in __init__
    self._connect()
    File “/usr/lib/python2.6/site-packages/pika/connection.py”, line 804, in _connect
    self._adapter_connect()
    File “/usr/lib/python2.6/site-packages/pika/adapters/blocking_connection.py”, line 146, in _adapter_connect
    self.process_data_events()
    File “/usr/lib/python2.6/site-packages/pika/adapters/blocking_connection.py”, line 88, in process_data_events
    if self._handle_read():
    File “/usr/lib/python2.6/site-packages/pika/adapters/blocking_connection.py”, line 184, in _handle_read
    super(BlockingConnection, self)._handle_read()
    File “/usr/lib/python2.6/site-packages/pika/adapters/base_connection.py”, line 300, in _handle_read
    return self._handle_error(error)
    File “/usr/lib/python2.6/site-packages/pika/adapters/base_connection.py”, line 264, in _handle_error
    self._handle_disconnect()
    File “/usr/lib/python2.6/site-packages/pika/adapters/blocking_connection.py”, line 181, in _handle_disconnect
    self._on_connection_closed(None, True)
    File “/usr/lib/python2.6/site-packages/pika/adapters/blocking_connection.py”, line 235, in _on_connection_closed
    raise exceptions.AMQPConnectionError(*self.closing)
    pika.exceptions.AMQPConnectionError: (0, ”)

  4. Jack says:

    Hi Ken,

    getting an exception when starting flume, so can you please send on your flume.conf as I am clearly misconfiguring the RabbitMQ/Flume sink

    16 Oct 2014 04:20:59,696 WARN [conf-file-poller-0] (org.apache.flume.conf.FlumeConfiguration$AgentConfiguration.validateSources:589) – Could not configure source rabbitmq_source1 due to: Component has no type. Cannot configure. rabbitmq_source1
    org.apache.flume.conf.ConfigurationException: Component has no type. Cannot configure. rabbitmq_source1
    at org.apache.flume.conf.ComponentConfiguration.configure(ComponentConfiguration.java:76)
    at org.apache.flume.conf.source.SourceConfiguration.configure(SourceConfiguration.java:55)
    at org.apache.flume.conf.FlumeConfiguration$AgentConfiguration.validateSources(FlumeConfiguration.java:566)
    at org.apache.flume.conf.FlumeConfiguration$AgentConfiguration.isValid(FlumeConfiguration.java:345)
    at org.apache.flume.conf.FlumeConfiguration$AgentConfiguration.access$000(FlumeConfiguration.java:212)
    at org.apache.flume.conf.FlumeConfiguration.validateConfiguration(FlumeConfiguration.java:126)
    at org.apache.flume.conf.FlumeConfiguration.(FlumeConfiguration.java:108)
    @

  5. Mohan Pindyala says:

    Hi Ken,
    I followed all your instructions and setup flume within HDP sandbox but it is not consuming messages from the queue . I don’t see any error either . Please look at my configuration
    I have masked my IP. Am I doing anything wrong ?
    ===============================
    sandbox.sources = rabbitmq_source1
    sandbox.channels = file_channel
    sandbox.sinks = sink_to_hdfs

    # Define Rabbit Source
    sandbox.sources.rabbitmq_source1.hostname =X.X.X.X
    sandbox.sources.rabbitmq-source1.type = org.apache.flume.source.rabbitmq.RabbitMQSource
    sandbox.sources.rabbitmq_source1.queuename = serverlogs.usage.queue
    sandbox.sources.rabbitmq_source1.username = guest
    sandbox.sources.rabbitmq_source1.password = guest
    sandbox.sources.rabbitmq_source1.port = 55672
    sandbox.sources.rabbitmq_source1.virtualhost = /

    # HDFS sinks
    sandbox.sinks.sink_to_hdfs.type = hdfs
    sandbox.sinks.rabbitmq-sink1.type = org.apache.flume.sink.rabbitmq.RabbitMQSink
    sandbox.sinks.sink_to_hdfs.hdfs.fileType = DataStream
    sandbox.sinks.sink_to_hdfs.hdfs.path = /flume/rab_events
    sandbox.sinks.sink_to_hdfs.hdfs.filePrefix = eventlog
    sandbox.sinks.sink_to_hdfs.hdfs.fileSuffix = .log
    sandbox.sinks.sink_to_hdfs.hdfs.batchSize = 1000

    # Use a channel which buffers events in memory
    sandbox.channels.file_channel.type = file
    sandbox.channels.file_channel.checkpointDir = /var/flume/checkpoint
    sandbox.channels.file_channel.dataDirs = /var/flume/data

    # Bind the source and sink to the channel
    sandbox.sources.rabbitmq_source1.channels = file_channel
    sandbox.sinks.sink_to_hdfs.channel = file_channel

    • Mohan Pindyala says:

      actually I am getting error with above config

      23 Jan 2015 08:31:11,203 WARN [conf-file-poller-0] (org.apache.flume.conf.FlumeConfiguration$AgentConfiguration.validateSources:589) – Could not configure source rabbitmq_source1 due to: Component has no type. Cannot configure. rabbitmq_source1
      org.apache.flume.conf.ConfigurationException: Component has no type. Cannot configure. rabbitmq_source1
      at org.apache.flume.conf.ComponentConfiguration.configure(ComponentConfiguration.java:76)
      at org.apache.flume.conf.source.SourceConfiguration.configure(SourceConfiguration.java:55)
      at org.apache.flume.conf.FlumeConfiguration$AgentConfiguration.validateSources(FlumeConfiguration.java:566)
      at org.apache.flume.conf.FlumeConfiguration$AgentConfiguration.isValid(FlumeConfiguration.java:345)
      at org.apache.flume.conf.FlumeConfiguration$AgentConfiguration.access$000(FlumeConfiguration.java:212)
      at org.apache.flume.conf.FlumeConfiguration.validateConfiguration(FlumeConfiguration.java:126)
      at org.apache.flume.conf.FlumeConfiguration.(FlumeConfiguration.java:108)
      at org.apache.flume.node.PropertiesFileConfigurationProvider.getFlumeConfiguration(PropertiesFileConfigurationProvider.java:193)
      at org.apache.flume.node.AbstractConfigurationProvider.getConfiguration(AbstractConfigurationProvider.java:94)
      at org.apache.flume.node.PollingPropertiesFileConfigurationProvider$FileWatcherRunnable.run(PollingPropertiesFileConfigurationProvider.java:140)
      at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)
      at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:304)
      at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:178)
      at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
      at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
      at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
      at java.lang.Thread.run(Thread.java:745)

  6. Mohan Pindyala says:

    yes , I did . Looks like it is not able to locate the jar at run time. I put following in flume-env.conf but it doesn’t work
    # Licensed to the Apache Software Foundation (ASF) under one
    # or more contributor license agreements. See the NOTICE file
    # distributed with this work for additional information
    # regarding copyright ownership. The ASF licenses this file
    # to you under the Apache License, Version 2.0 (the
    # “License”); you may not use this file except in compliance
    # with the License. You may obtain a copy of the License at
    #
    # http://www.apache.org/licenses/LICENSE-2.0
    #
    # Unless required by applicable law or agreed to in writing, software
    # distributed under the License is distributed on an “AS IS” BASIS,
    # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    # See the License for the specific language governing permissions and
    # limitations under the License.

    # If this file is placed at FLUME_CONF_DIR/flume-env.sh, it will be sourced
    # during Flume startup.

    # Enviroment variables can be set here.

    # export JAVA_HOME=/usr/lib/jvm/java-6-sun

    # Give Flume more memory and pre-allocate, enable remote monitoring via JMX
    # export JAVA_OPTS=”-Xms100m -Xmx2000m -Dcom.sun.management.jmxremote”

    # Note that the Flume conf directory is always included in the classpath.
    FLUME_CLASSPATH=/usr/lib/flume/plugins.d/flume-rabbitmq/lib/

    # export HIVE_HOME=/usr/lib/hive
    # export HCAT_HOME=/usr/lib/hive-hcatalog

  7. Ravi says:

    Good post Ken. Thanks!

    I am trying this as well but not HDP singlenode. I am using the final copy of Mohan’s conf file. Ensured the plug-in is set to FLUME_CLASSPATH. But I still get the below error.

    Error:
    2015-01-29 09:50:55,932 (conf-file-poller-0) [WARN – org.apache.flume.conf.FlumeConfiguration$AgentConfiguration.validateSources(FlumeConfiguration.java:571)] Removed rabbitmq_source1 due to Component has no type. Cannot configure. rabbitmq_source1

    On the RabbitMQ I just have a queue to which I am publishing events. Should I be doing anything more on RabbitMQ?

    • KenR says:

      Hi Ravi, It looks like the problem is related to your configuration file… specifically the source section. Can you double check that the values are correct for your environment

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out / Change )

Twitter picture

You are commenting using your Twitter account. Log Out / Change )

Facebook photo

You are commenting using your Facebook account. Log Out / Change )

Google+ photo

You are commenting using your Google+ account. Log Out / Change )

Connecting to %s