Apache Flume – Get logs out of RabbitMQ and into HDFS

This post is an extension of Tutorial 12 from Hortonworks (original here), which shows how to use Apache Flume to consume entries from a log file and put them into HDFS.

One of the problems that I see with the Hortonworks sandbox tutorials (and don’t get me wrong, I think they are great) is the assumption that you already have data loaded into your cluster, or they demonstrate an unrealistic way of loading data into your cluster – uploading a csv file through your web browser. One of the exceptions to this is tutorial 12, which shows how to use Apache Flume to monitor a log file and insert the contents into HDFS.

In this post I’m going to further extend the original tutorial to show how to use Apache Flume to read log entries from a RabbitMQ queue.

Apache Flume is described by the folk at Hortonworks as:

Apache™ Flume is a distributed, reliable, and available service for efficiently collecting, aggregating, and moving large amounts of streaming data into the Hadoop Distributed File System (HDFS). It has a simple and flexible architecture based on streaming data flows; and is robust and fault tolerant with tunable reliability mechanisms for failover and recovery.

…continue reading about Apache Flume over on the Hortonworks website.

Overview

In this article I will cover off the following:

  • Installation and Configuration of Flume
  • Generating fake server logs into RabbitMQ

To follow along you will need to:

Installing Flume

WIth the Sandbox up and running, press Alt and F5 to bring up the login screen. You can login using the default credentials:

login: root
password: hadoop

After you’ve logged in type:

yum install -y flume

You should now see the installation progressing until it says Complete!

For more details on installation take a look at Tutorial 12 from Hortonworks.

Using the flume.conf file that is part of my tutorial files, follow the instructions to upload it into the sandbox from the tutorial. Before uploading the file, you should check that the RabbitMQ configuration matches your system:

sandbox.sources.rabbitmq_source1.hostname = 192.168.56.65
sandbox.sources.rabbitmq_source1.queuename = logs
sandbox.sources.rabbitmq_source1.username = guest
sandbox.sources.rabbitmq_source1.password = guest
sandbox.sources.rabbitmq_source1.port = 5672
sandbox.sources.rabbitmq_source1.virtualhost = logs

You shouldn’t need to change anything else.

For Flume to be able to consume from a RabbitMQ queue I created a new plugins directory and then upload the Flume-ng RabbitMQ library.

Creating the required directories can be done from the Sandbox console with the following command:

mkdir /usr/lib/flume/plugins.d
mkdir /usr/lib/flume/plugins.d/flume-rabbitmq
mkdir /usr/lib/flume/plugins.d/flume-rabbitmq/lib

Once these directories have been created, upload the flume-rabbitmq-channel-1.0-SNAPSHOT.jar file into the lib directory.

Starting Flume

From the Sandbox console, execute the following command

flume-ng agent -c /etc/flume/conf -f /etc/flume/conf/flume.conf -n sandbox

Generate server logs into RabbitMQ

To generate log entries I took the original python script (which appended entries to the end of a log file), and modified it to publish log entries to RabbitMQ.

To run the python script you will need to follow the instructions on the RabbitMQ site to install the pika client library (see details on the RabbitMQ website).

The script is setup to connect to a broker on the localhost into a virtual host called “logs”. You will need to make sure that the virtual host exists.

You can start the script by running:

python.exe c:\path\to\generate_logs.py

When this is started the script will declare an exchange and queue and then start publishing log entries.

You can see that everything is running by going over the RabbitMQ Management console.

3-flume-consumingThe incoming messages are the ones being generated by the Python script and the deliver / get and ack messages are the ones being consumed by Flume.

Setting up HCatalog

The following command (from the original tutorial) can be used to create the HCatalog table (make sure you only enter it only on a single line):

hcat -e “CREATE TABLE firewall_logs (time STRING, ip STRING, country STRING, status STRING) ROW FORMAT DELIMITED FIELDS TERMINATED BY ‘|’ LOCATION ‘/flume/rab_events’;”

You should now be able to browse this table from the web interface.

To do some analysis on this data you can now follow steps 5 and 6 from the original tutorial.

Advertisements

Loading a Data Warehouse Dimension with RabbitMQ and SSIS

In this post I’m going to take a break from the actual development of custom components for SSIS and show how the RabbitMQ Source that I have created so far could be used in an ETL package to load a data warehouse.

The Scenario

The scenario for this post is a company that sells cool stuff over the internet, they have a website where people can signup and registered users can place orders.

The way this imaginary system works:

  • When a user signs up, their details are created in the operational database which is used by the website. After creation their details are then sent to RabbitMQ
  • When a user updates their details, they are updated in the operational database, this information is also sent to RabbitMQ
  • When a user places an order, the details are sent to two different RabbitMQ queues for processing:
    • An imaginary process writes these details into the operational database in “real-time”
    • An ETL process runs periodically to load the orders into the data warehouse

In this post I’m going to show how the customer dimension can be loaded from the RabbitMQ queue, in a future post I will show how to load the fact table via SSIS.

My ETL package will do the following:

  • Read from a queue of customers, parse the xml and populate attribute columns
  • Using the SCD (slowly changing dimension) component to populate the customer dimension

Preparing the solution

The only preparation I did before starting was to create a new database with a new table DimCustomer

CREATE TABLE [dbo].[DimCustomer](
	[CustomerKey] [int] IDENTITY(1,1) NOT NULL,
	[CustomerBusinessKey] [int] NOT NULL,
	[FirstName] [nvarchar](50) NOT NULL,
	[Surname] [nvarchar](50) NOT NULL,
	[Gender] [char](1) NOT NULL,
	[City] [nvarchar](50) NOT NULL,
	[Country] [nvarchar](50) NOT NULL,
	[MaritalStatus] [char](1) NOT NULL,
	[StartDate] [datetime] NULL,
	[EndDate] [datetime] NULL,
 CONSTRAINT [PK_DimCustomer] PRIMARY KEY CLUSTERED
(
	[CustomerKey] ASC
)
) ON [PRIMARY]

Creating the ETL Package

The ETL Package is going to have a number of different steps that it goes through.

  1. Retrieve customer messages from queue
  2. Parse XML
  3. Load customers into the dimension, accounting for SCD attributes

The end result ended up looking like this:

etl-1

Building the package is actually very straightforward. After creating a new solution and adding a new package, I added a new data flow task.

To the data flow task, I first added the RabbitMQ Source and configured it with a new RabbitMQ Connection manager to my local broker, I also told it to use the customer queue.

Next I added a script component, this is where the xml message from RabbitMQ will be parsed.

The script component is the key to making the messages in RabbitMQ understood in the SSIS package. What the script does is  it takes in each of the messages and then updates rows in a new output buffer. This output buffer contains columns corresponding to the elements in the xml message.

The XML messages that are published look like this:

<customer>
  <id>1</id>
  <firstname>Peter</firstname>
  <surname>Parker</surname>
  <gender>M</gender>
  <maritalstatus>S</maritalstatus>
  <city>New York</city>
  <country>USA</country>
</customer>

To configure the Script component, first add a new output then add each of the fields from the xml above.

etl-2

Next edit the script itself and updated the Input0_ProcessInputRow method:

public override void Input0_ProcessInputRow(Input0Buffer Row)
{
  var stringReader = new System.IO.StringReader(Row.MessageContents);
  var xmlDoc = new XmlDocument();
  xmlDoc.Load(stringReader);

  Row.CustomerKey = int.Parse(xmlDoc.GetElementsByTagName("id")[0].InnerText);
  Row.FirstName = xmlDoc.GetElementsByTagName("firstname")[0].InnerText;
  Row.Gender = xmlDoc.GetElementsByTagName("gender")[0].InnerText;
  Row.Surname = xmlDoc.GetElementsByTagName("surname")[0].InnerText;
  Row.City = xmlDoc.GetElementsByTagName("city")[0].InnerText;
  Row.Country = xmlDoc.GetElementsByTagName("country")[0].InnerText;
  Row.MaritalStatus = xmlDoc.GetElementsByTagName("maritalstatus")[0].InnerText;
}

That is it for the script component, the next step is to drag in the Slowly Changing Dimension component. After connecting the script component to the slowly changing dimension component, double clicking will bring up the Slowly Changing Dimension Wizard.

The first page of the wizard is where you configure the connection and table of the dimension, after doing that the wizard will attempt to match the different columns in the table to columns from the input into the component. The next step on this page is to select which column is the business key.

etl-3

The next page is where the SCD types are set for the columns. Historical is SCD Type 2, Changing is SCD Type 1 and Fixed is SCD Type 0.

etl-4

In the next page you can configure how SSIS should deal with different scenarios, below you can see that I’ve opted not to fail the package if fixed attributes change, and ticked the second option.

etl-5

The next page is used to configure how Type 2 dimensions should be stored in the dimension table, so that they can be recognized.

The wizard gives us two options:

  • Use a status column, the first wizard option, whereby a column can be selected as the indicator column then two values can be selected which correspond to the current or expired flag
  • Use start/end date columns, the second wizard option, for this option two datetime columns need to be selected and then set what value to use for the date value

etl-6

The next page gives the option to enable interred member support, outside the scope of this article. I have left this turned off.

etl-7

After clicking Finish, the wizard will close and the remaining elements, that you can see in the picture of the complete package above, will be added.

Running the package

To demonstrate the package, I’m going to using the RabbitMQ management interface put some messages into the queue and then run the package.

The first message I’m placing into the queue is this:

<customer>
  <id>1</id>
  <firstname>Peter</firstname>
  <surname>Parker</surname>
  <gender>M</gender>
  <maritalstatus>S</maritalstatus>
  <city>New York</city>
  <country>USA</country>
</customer>

This is the xml sample from above. After running the package I can query the database and get the following output

etl-demo-1

The next two messages I’m putting in are:

<customer>
  <id>1</id>
  <firstname>Spider</firstname>
  <surname>Man</surname>
  <gender>M</gender>
  <maritalstatus>S</maritalstatus>
  <city>Los Angeles</city>
  <country>USA</country>
</customer>

and

<customer>
<id>2</id>
  <firstname>Clarke</firstname>
  <surname>Kent</surname>
  <gender>M</gender>
  <maritalstatus>S</maritalstatus>
  <city>Kansas</city>
  <country>USA</country>
</customer>

The first is an update to Peter Parker, the second is a new customer. After running the package the output from the table is now this:

etl-demo-2

You can see that the package has identified that Peter Parker is an existing customer, so expires the old dimension and adds a new one, it also adds a new row for Clarke Kent.

Summary

As you can see using the RabbitMQ source itself is very easy, and thanks to some of the pre-built components in SSIS, building a package that loads dimensions into a data warehouse is very easy.

I’m going to be taking a break from blogging about Custom SSIS Components and start a new series on building a real-time monitoring system with StreamInsight.

Part 4 – Add a Custom UI to the RabbitMQ Source

A lot of the content in this article is similar to what I did in Part 2, so if you haven’t read it I suggest you go back and read it now 🙂

The next part of this series will take a look at implementing a custom UI for configuration of the RabbitMQ Source component. The main reference for this entry can be found on MSDN.

A shout out and my thanks goes to Tillmann Eitelberg (Web | Twitter) who pointed me in the right direction when I was trying to figure out how to get a list of all the connection managers in a package. Thanks!

Preparing the Project

To get my project ready I added two new files to the RabbitMQSource project. The first was an empty class file called RabbitMQSourceUI.cs and the second was a Windows Forms class called RabbitMQSourceUIForm.cs.

Then the DtsPipelineComponent attribute on the RabbitMQSource class needs to have the UITypeName property set. This indicates to SSDT BI that the custom source has a user interface for configuration. Just like in Part 2, the UITypeName property is the fully qualified type name of the RabbitMQSourceUI class. (See Part 2 for details on how the fully qualified type name is constructed).

The DtsPipelineComponent attribute, after setting the UITypeName property looks like this:

[DtsPipelineComponent(DisplayName = "RabbitMQ Source",
    ComponentType = ComponentType.SourceAdapter, Description = "Connection source for RabbitMQ",
    UITypeName = "SSISRabbitMQ.RabbitMQSource.RabbitMQSourceUI, SSISRabbitMQ.RabbitMQSource, Version=1.0.0.0, Culture=neutral, PublicKeyToken=ac1c316408dd3955")]

Creating the Custom UI

Open the RabbitMQSourceUI file and have the class implement the IDtsComponentUI interface.

Similar to what I did for the RabbitMQ Connection Manager UI, I’m only implementing some of the methods.

The Initialize and New methods are called when the RabbitMQ Source component is first created. In the Initialize method below, I’m storing the service provider and meta data objects that were passed through, into class variables. These will then be passed through to the Form and used to update the RabbitMQ Source component as well as the package.

public void Initialize(IDTSComponentMetaData100 dtsComponentMetadata, IServiceProvider serviceProvider)
{
  this.serviceProvider = serviceProvider;
  this.metaData = dtsComponentMetadata;

  this.connectionService = (IDtsConnectionService)serviceProvider.GetService(typeof(IDtsConnectionService));
}

The New method is called right after Initialize and it just displays the form.

public void New(IWin32Window parentWindow)
{
  ShowForm(parentWindow);
}

The ShowForm method is used by both the New and Edit methods.


private DialogResult ShowForm(IWin32Window window)
{
  RabbitMQSourceUIForm form = new RabbitMQSourceUIForm(this.metaData, this.serviceProvider);

  return form.ShowDialog(window);
}

The Edit method is called when the user double clicks on the component, or right clicks and selects “Edit”.


public bool Edit(IWin32Window parentWindow, DTSRuntime.Variables variables, DTSRuntime.Connections connections)
{
  return ShowForm(parentWindow);
}

The next step is to design the user interface. After adding various UI elements to the RabbitMQUIForm, I ended up with a design that looks like this:

ssis-rabbitmq-part4-0

All the code is available on GitHub if you’re interested in seeing how the Form was put together.

The next step is to add some methods in the form class.

In the constructor I am storing the service provider and meta data that were passed through, as well as getting an instance of the IDtsConnectionService (line 6) and an instance of CManagedComponentWrapper (line 7).

public RabbitMQSourceUIForm(Microsoft.SqlServer.Dts.Pipeline.Wrapper.IDTSComponentMetaData100 metaData, IServiceProvider serviceProvider)
  : this()
{
  this.metaData = metaData;
  this.serviceProvider = serviceProvider;
  this.connectionService = (IDtsConnectionService)serviceProvider.GetService(typeof(IDtsConnectionService));
  this.designTimeInstance = metaData.Instantiate();
}

The connection manager service exposes the connection managers that have already been created in the package, it also provides a number of methods to create new connection managers.

In the RabbitMQSourceUI_Load method below, I am using the connection manager service to retrieve a list of all connection managers, check if they are RabbitMQConnectionManagers then populating the connections combo box. This will let the user select which connection manager to use for the source.

It will also preselect the connection manager currently assigned to the source if there is one and sets the queue name textbox value to the QueueName property if it has been set. This is useful when editing the source.

private void RabbitMQSourceUIForm_Load(object sender, EventArgs e)
{
  var connections = connectionService.GetConnections();

  var queueName = metaData.CustomPropertyCollection[0];
  txtQueueName.Text = queueName.Value;

  string connectionManagerId = string.Empty;

  var currentConnectionManager = this.metaData.RuntimeConnectionCollection[0];
  if (currentConnectionManager != null)
  {
    connectionManagerId = currentConnectionManager.ConnectionManagerID;
  }

  for (int i = 0; i < connections.Count; i++)
  {
    var conn = connections[i].InnerObject as RabbitMQConnectionManager.RabbitMQConnectionManager;

    if (conn != null)
    {
      var item = new ConnectionManagerItem()
      {
        Name = connections[i].Name,
        ConnManager = conn,
        ID = connections[i].ID
      };
      cbConnectionList.Items.Add(item);

      if (connections[i].ID.Equals(connectionManagerId))
      {
        cbConnectionList.SelectedIndex = i;
      }
    }
  }
}

ConnectionManagerItem is a custom class which I created to populate the combo box – see GitHub source for details.

When the user clicks the OK button two things need to happen, first the Queue Name property needs to be set to what is on the form, and second the selected connection manager needs to be associated with the connection manager defined in the component.

private void btnOK_Click(object sender, EventArgs e)
{
  if (!string.IsNullOrWhiteSpace(txtQueueName.Text))
  {
    designTimeInstance.SetComponentProperty("QueueName", txtQueueName.Text);
  }

  if (cbConnectionList.SelectedItem != null)
  {
    var item = (ConnectionManagerItem)cbConnectionList.SelectedItem;
    this.metaData.RuntimeConnectionCollection[0].ConnectionManagerID = item.ID;
  }

  this.DialogResult = System.Windows.Forms.DialogResult.OK;
  this.Close();
}

If you read part 3, then you will hopefully remember what I said about having to hard code the ConnectionManagerID property in the ProvideComponentProperties method, this snippet of code above now fixes this issue of hard code values 🙂

Having used SSIS for sometime now, I appreciate that there is a New button on most source components to create a new connection manager, this streamlines the process of creating sources. So I thought I should also provide the users of my component with the same feature.

In the method below you can see that I am using the connection service to create a new connection, the type is RABBITMQ and this matches up with ConnectionType in the DtsConnection attribute on the RabbiMQConnectionManager class that was created in part 1. What is really nice is that since I implemented a custom UI to configure the RabbitMQ connection manager, it is displayed to the user as soon as they click the New button on the form. This is all done by SSDT BI when the CreateConnection method on the connection service is called. So easy.

The MSDN documentation says it returns an ArrayList which holds all the connections that were created, if the user cancels the creation then it will return an empty list. As far a I can tell it should only return one item in the list.

private void btnNewConnectionManager_Click(object sender, EventArgs e)
{
  System.Collections.ArrayList created = connectionService.CreateConnection("RABBITMQ");

  foreach (ConnectionManager cm in created)
  {
    var item = new ConnectionManagerItem()
    {
      Name = cm.Name,
      ConnManager = cm.InnerObject as RabbitMQConnectionManager.RabbitMQConnectionManager,
      ID = cm.ID
    };

    cbConnectionList.Items.Insert(0, item);
  }
}

Adding an Icon

In the screenshots below you will see an icon on the RabbitMQ Source component. To get the icon to show up, I added an ico file to my project then set the Build Action to “Embedded Resource”.

The only other thing left to do is update the DtsPipelineComponent attribute on the RabbitMQSource class. After setting the IconResource property this now looks like:

  [DtsPipelineComponent(IconResource = "SSISRabbitMQ.RabbitMQSource.Rabbit.ico",
    DisplayName = "RabbitMQ Source",
    ComponentType = ComponentType.SourceAdapter,
    Description = "Connection source for RabbitMQ",
    UITypeName = "SSISRabbitMQ.RabbitMQSource.RabbitMQSourceUI, SSISRabbitMQ.RabbitMQSource, Version=1.0.0.0, Culture=neutral, PublicKeyToken=ac1c316408dd3955")]

What it looks like

Now time to see how this makes configuration a much easier experience!

The below screenshot shows what the user see when they first add a RabbitMQ Source to a data flow task, the dialog box is displayed straightaway.

ssis-rabbitmq-part4-1

After clicking the New button, the dialog box created in part 2 is displayed for the user to setup the RabbitMQ Connection Manager

ssis-rabbitmq-part4-2

After clicking OK for the connection manager to be created you see that it appears in the drop down.

ssis-rabbitmq-part4-3

Finally, after clicking OK for the source to be created you can see that the source has passed validation, and that a RabbitMQ Connection Manager has been created – note that this no longer needs to be called “RabbitMQ”.

ssis-rabbitmq-part4-4

Up next I’m going to take a short break from the development side of things and show how the RabbitMQ Source that has so far been developed could be used to load a fictional data warehouse.

Part 3 – Building a Custom Source Component

So far in this series I have shown how to create a custom connection manager and also how to create a user friendly interface to configure and test it. In this post I’m going to outline the steps I went through to create a custom source component for SSIS. By the end of this post I will show messages that are in a RabbitMQ queue being consumed by an SSIS package.

Preparing the Solution

The first step is to create a new class library project in the solution. I called mine SSISRabbitMQ.RabbitMQSource. Then add references to the following assemblies:

  • Microsoft.SqlServer.Dts.Design
  • Microsoft.SqlServer.DTSPipelineWrap
  • Microsoft.SqlServer.DTSRuntimeWrap
  • Microsoft.SqlServer.ManagedDTS
  • Microsoft.SqlServer.PipelineHost

These can be found  in C:\Program Files (x86)\Microsoft SQL Server\110\SDK\Assemblies

Then using Part 1 as a reference perform the following steps:

  • Add the RabbitMQ client library via NuGet
  • Add a post build step to copy the output assembly into the GAC

Then add a new class file called RabbitMQSource.cs into the project.

Creating the Custom Source

To have SSDT BI recognize the RabbitMQSource class as a source component, two things need to be done:

  • Apply the DtsPipelineComponent attribute
  • Extend and override methods from the PipelineComponent class (MSDN documentation)

The DtsPipelineComponent attribute is similar to the DtsConnection attribute which was applied to the custom connection manager in the first post. This attribute signals to SSDT BI that it is a custom component.

There are a number of properties that need to be set:

  • DisplayName – this is what will be displayed in the SSIS Toolbox in SSDT BI
  • Description – this will also be displayed in the SSIS Toolbox
  • ComponentType – this tells SSDT BI what kind of component the class is, (Source, Destination or Transformation)

After applying the attribute and extending the PipelineComponent class so far I have this code:

[DtsPipelineComponent(DisplayName = "RabbitMQ Source",
  ComponentType = ComponentType.SourceAdapter,
  Description = "Connection source for RabbitMQ")]
public class RabbitMQSource : PipelineComponent
{
}

Implementing the Custom Source

The first method to override is ProvideComponentProperties. This is the method which is called when the source is added to a package. In here goes all the setup for the source component. In the code below you can see I’m doing three things:

  1. Creating a new output
  2. Creating a custom property for the Queue name
  3. Creating a connection
public override void ProvideComponentProperties()
{
  // Reset the component.
  base.RemoveAllInputsOutputsAndCustomProperties();
  ComponentMetaData.RuntimeConnectionCollection.RemoveAll();

  IDTSOutput100 output = ComponentMetaData.OutputCollection.New();
  output.Name = "Output";

  IDTSCustomProperty100 queueName = ComponentMetaData.CustomPropertyCollection.New();
  queueName.Name = "QueueName";
  queueName.Description = "The name of the RabbitMQ queue to read messages from";

  IDTSRuntimeConnection100 connection = ComponentMetaData.RuntimeConnectionCollection.New();
  connection.Name = "RabbitMQ";
  connection.ConnectionManagerID = "RabbitMQ";

  CreateColumns();
}

Very important and something that took me a little while to figure out was the line:

connection.ConnectionManagerID = “RabbitMQ”;

Alot of other tutorials that I read online left this out, however I found that without it my source didn’t work.

Basically, this line is saying “In the package there needs to be a connection called RabbitMQ”, it will also need to be a RabbitMQConnectionManager which will become obvious in the AcquireConnections method below.

(In the next post I will be showing how to create a custom user interface for this source which will allow the user to select the appropriate connection manager, which will avoid the need to hard coding this value!)

The CreateColumns method looks like this:

private void CreateColumns()
{
  IDTSOutput100 output = ComponentMetaData.OutputCollection[0];

  output.OutputColumnCollection.RemoveAll();
  output.ExternalMetadataColumnCollection.RemoveAll();

  IDTSOutputColumn100 column1 = output.OutputColumnCollection.New();
  IDTSExternalMetadataColumn100 exColumn1 = output.ExternalMetadataColumnCollection.New();

  IDTSOutputColumn100 column2 = output.OutputColumnCollection.New();
  IDTSExternalMetadataColumn100 exColumn2 = output.ExternalMetadataColumnCollection.New();

  column1.Name = "MessageContents";
  column1.SetDataTypeProperties(DataType.DT_WSTR, 4000, 0, 0, 0);

  column2.Name = "RoutingKey";
  column2.SetDataTypeProperties(DataType.DT_WSTR, 100, 0, 0, 0);
}

In here I am setting up the Output, adding two columns. The first will contain the message contents, and the second column will contain the routing key that was used to publish the message in RabbitMQ.

Connecting to External Data Sources

As the source will be making use of a connection manager there are two methods to override, these are the AcquireConnections and ReleaseConnections. Both the AcquireConnections and ReleaseConnections methods are called during design time for validation purposes and at runtime to establish a connection for use during package execution.

The AcquireConnections method will look for the ConnectionManager object on the connection that was created in the ProvideComponentProperties method. In the method below, once it has a reference to the connection manager (line 5 and 6) it checks that what has been returned is a RabbitMQConnectionManager (line 8 and 10). It then retrieves the queue name that was set on the source component and then calls the AcquireConnection on the connection manager itself. This will attempt to establish a connection to the RabbitMQ broker, which will return an IConnection. (For more details on this check out the first post.) It then stores the IConnection which was returned so it can be used later on in the component, as well as used in the ReleaseConnections method.

public override void AcquireConnections(object transaction)
{
  if (ComponentMetaData.RuntimeConnectionCollection[0].ConnectionManager != null)
  {
    ConnectionManager connectionManager = Microsoft.SqlServer.Dts.Runtime.DtsConvert.GetWrapper(
      ComponentMetaData.RuntimeConnectionCollection[0].ConnectionManager);

    this.rabbitMqConnectionManager = connectionManager.InnerObject as RabbitMQConnectionManager.RabbitMQConnectionManager;

    if (this.rabbitMqConnectionManager == null)
      throw new Exception("Couldn't get the RabbitMQ connection manager, ");

this.queueName = ComponentMetaData.CustomPropertyCollection["QueueName"].Value;
 rabbitConnection = this.rabbitMqConnectionManager.AcquireConnection(transaction) as IConnection;
 }
}

The next method to override is the ReleaseConnections, whose purpose it is to do any cleanup on any connections created by the AcquireConnections method. All I am doing below is checking that the rabbitMqConnectionManager has been set, then passing the rabbitConnection variable into it to be released, this is the IConnection which was returned in the AcquireConnections method.

public override void ReleaseConnections()
{
  if (rabbitMqConnectionManager != null)
  {
    this.rabbitMqConnectionManager.ReleaseConnection(rabbitConnection);
  }
}

Validating the Source

To ensure that the source has been correctly configured by the user, I have implementation the Validate method. All this does is check if the user has set the queue name. For more details on the Validate method see the MSDN documentation.

public override DTSValidationStatus Validate()
{
  bool cancel;
  string qName = ComponentMetaData.CustomPropertyCollection["QueueName"].Value;

  if (string.IsNullOrWhiteSpace(qName))
  {
    //Validate that the QueueName property is set
    ComponentMetaData.FireError(0, ComponentMetaData.Name, "The QueueName property must be set", "", 0, out cancel);
    return DTSValidationStatus.VS_ISBROKEN;
  }

  return base.Validate();
}

Getting messages out of RabbitMQ and into SSIS

Now that all the dull setup bits are out of the way, it is time for the interesting part!

There are a number of methods that are executed as part of the SSIS Pipeline, in this section I will be showing the PreExecute and PrimeOutput methods. The sequence of execution is:

  1. PrepareForExecute
  2. PreExecute
  3. PrimeOutput for a source and ProcessInput for a destination/transformation
  4. PostExecute
  5. Cleanup

During execution the PreExecute method is called once, therefore the MSDN documentation recommends that you place as much logic in here as possible. In the PreExecute method below, I am creating the channel and consumer that will be used in the PrimeOutput method to retrieve messages from the RabbitMQ queue.

public override void PreExecute()
{
  try
  {
    this.consumerChannel = rabbitConnection.CreateModel();
    this.consumerChannel.QueueDeclare(queueName, true, false, false, null);
    this.queueConsumer = new QueueingBasicConsumer(this.consumerChannel);
    this.consumerTag = consumerChannel.BasicConsume(queueName, true, queueConsumer);
  }
  catch (Exception)
  {
    ReleaseConnections();
    throw;
  }
}

The PrimeOutput method is called after PreExecute, it is in here that rows are added to the output buffer. The number of output buffers in the buffers array is determined by the IDTSOutputCollection100. In the ProvideComponentProperties method at the beginning of this post I am adding a single output to this collection.

In the PrimeOutput method below, you can see that the component will continually retrieve messages from the RabbitMQ queue until there are none left.

Then for each message that it retrieves it calls the AddRow method on the buffer, this causes a new row to be created, it then sets the message contents and the routing key values of the message on the row.

Finally, the SetEndOfRowset method is called to indicate to the source that the component has finished adding rows.

public override void PrimeOutput(int outputs, int[] outputIDs, PipelineBuffer[] buffers)
{
  IDTSOutput100 output = ComponentMetaData.OutputCollection[0];
  PipelineBuffer buffer = buffers[0];

  object message;
  bool success;

  while (queueConsumer.IsRunning)
  {
    try
    {
      success = queueConsumer.Queue.Dequeue(100, out message);
    }
    catch (Exception)
    {
      break;
    }

    if (success)
    {
      BasicDeliverEventArgs e = (BasicDeliverEventArgs)message;

      var messageContent = System.Text.Encoding.UTF8.GetString(e.Body);

      buffer.AddRow();
      buffer[0] = messageContent;
      buffer[1] = e.RoutingKey;
    }
    else
    {
      break;
    }
  }

  buffer.SetEndOfRowset();
}

That is enough to start getting messages out of RabbitMQ and into the SSIS package.

Cleaning Up

The last stage is to clean up after ourselves. In the Cleanup method below I am cancelling the consumer that was created in the PreExecute method and then closing the channel.

public override void Cleanup()
{
  if (consumerChannel.IsOpen)
  {
    if (queueConsumer.IsRunning)
    {
      consumerChannel.BasicCancel(consumerTag);
    }
    consumerChannel.Close();
  }
  base.Cleanup();
}

The ReleaseConnections method will be called by SSIS to release the connection to RabbitMQ.

What it looks like

It is now time to run the package.

Below you can see the data flow tasks highlighted in the middle of the screen, to the right is the QueueName property and down the bottom is the RabbitMQ Connection Manager.

ssis-rabbitmq-source-1

Below you can see that the RabbitMQ Source has dequeued 2 messages, which are displayed in the data viewer, it has a tick indicating that it has finished dequeuing messages (this is correct because I only put two messages into the queue).

ssis-rabbitmq-source-2

Up next I will be building a custom interface for configuring the RabbitMQ Source.

Part 2 – Adding a Custom UI to the Connection Manager

In my previous post I explained how to create a custom connection manager for SSIS to connect to RabbitMQ.

In this post I’m going to extend this further by adding a custom UI where the connection can be configured and tested.

Preparing the Project

The first step is to add a few more references:

  • Microsoft.SqlServer.Dts.Design
  • Microsoft.SqlServer.DTSPipelineWrap
  • Microsoft.SqlServer.DTSRuntimeWrap
  • Microsoft.SqlServer.PipelineHost

These assemblies can be found in C:\Program Files (x86)\Microsoft SQL Server\110\SDK\Assemblies

The next step is to add two new files, the first is an empty class file which I’ve called RabbitMQConnectionManagerUI.cs the second is a Windows Forms class which I’ve called RabbitMQConnectionManagerUIForm.cs

For SSDT BI to know that there is a custom user interface associated with this connection manager the DtsConnection attribute on the RabbitMQConnectionManager class will need to be updated, by setting the UITypeName property.

The UITypeName refers to the fully qualified type name of the class added above, in the next section I’ll be showing the interface that this class will need to implement. The fully qualified type name looks like this:

SSISRabbitMQ.RabbitMQConnectionManager.RabbitMQConnectionManagerUI, SSISRabbitMQ.RabbitMQConnectionManager, Version=1.0.0.0, Culture=neutral, PublicKeyToken=abc123...

The PublicKeyToken can be tricky to get if you’ve never had to find it before. To get the PublicKeyToken of your assembly, open the Visual Studio Tools Command Prompt.

Browse to where the assembly is and type the following:

sn.exe -T SSISRabbitMQ.RabbitMQConnectionManager.dll

This will output the Public key token, like this:

ssis-rabbit-sn
The DtsConnection attribute now looks like this:


[DtsConnection(ConnectionType = "RABBITMQ",
    DisplayName = "RabbitMQ Connection Manager",
    Description = "Connection manager for RabbitMQ",
    UITypeName = "SSISRabbitMQ.RabbitMQConnectionManager.RabbitMQConnectionManagerUI, SSISRabbitMQ.RabbitMQConnectionManager, Version=1.0.0.0, Culture=neutral, PublicKeyToken=ac1c316408dd3955")]

Creating the Custom UI

So at this point the project has been setup with all the required classes and the DtsConnection attribute has been updated so SSDT BI will know that there is a custom UI to configure the connection. Now the fun begins.

The first step is to open the RabbitMQConnectionManagerUI class file and have it implement the IDtsConnectionManagerUI interface.


public class RabbitMQConnectionManagerUI : IDtsConnectionManagerUI
{
  public void Delete(IWin32Window parentWindow)
  {
    throw new NotImplementedException();
  }

  public bool Edit(IWin32Window parentWindow, Connections connections, ConnectionManagerUIArgs connectionUIArg)
  {
    throw new NotImplementedException();
  }

  public void Initialize(ConnectionManager connectionManager, IServiceProvider serviceProvider)
  {
    throw new NotImplementedException();
  }

  public bool New(IWin32Window parentWindow, Connections connections, ConnectionManagerUIArgs connectionUIArg)
  {
    throw new NotImplementedException();
  }
}

After Visual Studio has created all the stubs, we can see there are four methods which need to be implemented, I will come back to these after creating the Windows Form class.

Next I opened the RabbitMQConnectionManagerUIForm and added a bunch of UI components so it ended up looking like this:

ssis-rabbitmq-winforms

(All the code for the Form will be available in my GitHub repository so I’m only going to cover off the important/interesting parts.)

The next step was to add a new constructor to the Windows Forms class so that some important parameters can be passed through from the RabbitMQConnectionManagerUI, this will be the class which instantiates the Form.


public RabbitMQConnectionManagerUIForm(Microsoft.SqlServer.Dts.Runtime.ConnectionManager connectionManager, IServiceProvider serviceProvider)
  : this()
{
  this.connectionManager = connectionManager;
  this.serviceProvider = serviceProvider;

  SetFormValuesFromConnectionManager();
}

The connection manager that is being passed in is what the Form is actually going to be modifying.

The SetFormValuesFromConnectionManager method updates the Form each time it is loaded with the currently set values for the Connection Manager.


private void SetFormValuesFromConnectionManager()
{
  string hostname = connectionManager.Properties["HostName"].GetValue(connectionManager).ToString();
  string username = connectionManager.Properties["UserName"].GetValue(connectionManager).ToString();
  string password = connectionManager.Properties["Password"].GetValue(connectionManager).ToString();
  string virtualhost = connectionManager.Properties["VirtualHost"].GetValue(connectionManager).ToString();
  string port = connectionManager.Properties["Port"].GetValue(connectionManager).ToString();

  if (!string.IsNullOrWhiteSpace(hostname))
  {
    txtHost.Text = hostname;
  }
  if (!string.IsNullOrWhiteSpace(username))
  {
    txtUserName.Text = username;
  }
  if (!string.IsNullOrWhiteSpace(password))
  {
    txtPassword.Text = password;
  }
  if (!string.IsNullOrWhiteSpace(virtualhost))
  {
    txtVirtualHost.Text = virtualhost;
  }
  if (!string.IsNullOrWhiteSpace(port))
  {
    nmPort.Text = port;
  }
}

One of the important parts of the RabbitMQConnectionManagerUIForm class is what happens when the user clicks the OK button. In the event handler below I’m setting the DialogResult to be DialogResult.OK, this is so that the calling class knows the user has accepted the changes. As well as this there is also a call to the UpdateConnectionFromControls method, this method will go through each property in the connection manager and update them to what has been set in the Form.

private void btnOK_Click(object sender, EventArgs e)
{
  UpdateConnectionFromControls();

  this.DialogResult = DialogResult.OK;
  this.Close();
}

The UpdateConnectionFromControls method looks like this:

private void UpdateConnectionFromControls()
{
  int port = Convert.ToInt32(nmPort.Value);

  connectionManager.Properties["HostName"].SetValue(connectionManager, txtHost.Text);
  connectionManager.Properties["UserName"].SetValue(connectionManager, txtUserName.Text);
  connectionManager.Properties["Password"].SetValue(connectionManager, txtPassword.Text);
  connectionManager.Properties["VirtualHost"].SetValue(connectionManager, txtVirtualHost.Text);
  connectionManager.Properties["Port"].SetValue(connectionManager, port);
}

The code for the Test Connection button attempts to create a connection to the RabbitMQ broker using the values specified in the form.

private void btnTestConnection_Click(object sender, EventArgs e)
{
  IConnection connection = null;

  try
  {
    ConnectionFactory rabbitMqConnectionFactory = new ConnectionFactory()
    {
      HostName = txtHost.Text,
      VirtualHost = txtVirtualHost.Text,
      UserName = txtUserName.Text,
      Password = txtPassword.Text,
      Port = Convert.ToInt32(nmPort.Value)
    };

    connection = rabbitMqConnectionFactory.CreateConnection();

    if (connection != null && connection.IsOpen)
    {
      MessageBox.Show("Test connection verified", "RabbitMQ Connection Manager", MessageBoxButtons.OK,        MessageBoxIcon.Information);
    }
    else
    {
      MessageBox.Show("Test connection failed", "RabbitMQ Connection Manager", MessageBoxButtons.OK, MessageBoxIcon.Error);
    }

    connection.Close();
  }
  catch (Exception ex)
  {
    MessageBox.Show("Test connection failed!" + Environment.NewLine + ex.Message, "RabbitMQ Connection Manager", MessageBoxButtons.OK, MessageBoxIcon.Error);

    if (connection != null && connection.IsOpen)
    {
      connection.Close();
    }
    else if (connection != null)
    {
      connection.Dispose();
    }
  }
}

This is all I’m going to cover in terms of the Form, for the full class check it out on GitHub.

Now it is time to go back to the RabbitMQConnectionManagerUI. At the moment this class has four methods that need to be filled in:

  • Initialize
  • New
  • Edit
  • Delete

The Initialize method is very straightforward, I am simply just setting a reference in my class to the Connection Manager and Service Provider that was passed through to it. This is so they can then be passed onto the Form class when it is being instantiated, as you will see below.

public void Initialize(ConnectionManager connectionManager, IServiceProvider serviceProvider)
{
  this.connectionManager = connectionManager;
  this.serviceProvider = serviceProvider;
}

The New method is called by SSDT BI after the Initialize method, when the user is creating a new Connection Manager. It is suggested by the MSDN documentation that in here the Form is displayed for editing.

public bool New(IWin32Window parentWindow, Connections connections, ConnectionManagerUIArgs connectionUIArg)
{
  IDtsClipboardService clipboardService;

  clipboardService = (IDtsClipboardService)serviceProvider.GetService(typeof(IDtsClipboardService));
  if (clipboardService != null)
  // If connection manager has been copied and pasted, take no action.
  {
    if (clipboardService.IsPasteActive)
    {
      return true;
    }
  }

  return EditConnection(parentWindow);
}

The EditConnection method is used by both the New and Edit methods defined on the interface and is used to launch the Form:

private bool EditConnection(IWin32Window parentWindow)
{
  RabbitMQConnectionManagerUIForm frm = new RabbitMQConnectionManagerUIForm(connectionManager, serviceProvider);

  var result = frm.ShowDialog();

  if (result == DialogResult.OK)
  {
    return true;
  }
  else
  {
    return false;
  }
}

In the Edit method I am simply just displaying the Form using the handy helper method from above, so the user can edit the connection.

public bool Edit(IWin32Window parentWindow, Connections connections, ConnectionManagerUIArgs connectionUIArg)
{
  return EditConnection(parentWindow);
}

The Delete method is for any cleanup that is required when the Connection Manager is delete from the package, as we don’t need to perform any cleanup the method is very simple:

public void Delete(IWin32Window parentWindow)
{
}

What it looks like

At this point I can now build the project and open SSDT BI to see the result.

ssis-rabbitmq-winforms-result

Stay Tuned! In the next post I will show how to create a Custom Source which uses the connection manager to read messages from a RabbitMQ queue!

Part 1 – Building a Custom Connection Manager

Following on from my introductory post, here I will be explaining how to develop a Custom Connection Manager for RabbitMQ. This connection manager will then be used in future posts by a custom Source and Destination component.

What is a Connection Manager for?

From the MSDN documentation:

“Integration Services uses connection managers to encapsulate the information needed to connect to an external data source. […] If the connection managers and external data sources supported by Integration Services do not entirely meet your requirements, you can create a custom connection manager.”

By the sounds of it, creating a Connection Manager should be pretty straightforward:

  1. To create a custom connection manager, you have to create a class that inherits from the ConnectionManagerBase base class,
  2. Apply the DtsConnectionAttributeattribute to your new class,
  3. Override the important methods and properties of the base class, including the ConnectionString property and the AcquireConnection method.

Prerequisites

All of these examples will be designed against the SDK for SQL Server 2012. You will need to ensure you installed the Client SDK during the installation of SQL Server.

Setting up the solution

I created a new solution and added a new class library projects to it called SSISRabbitMQ.RabbitMQConnectionManager.

To this project I then added the RabbitMQ client library via NuGet:

Adding RabbitMQ client via NuGet

The next step is to add a reference to the Microsoft.SqlServer.ManagedDTS.dll, from the documentation:

“The Microsoft.SqlServer.Dts.Runtime namespace contains the classes and interfaces to create packages, custom tasks, and other package control flow elements.”

This assembly can be found in C:\Program Files (x86)\Microsoft SQL Server\110\SDK\Assemblies

ssis-rabbit-sdk

There are a few other assemblies in this directory that will be used in later stages.

For SSDT BI to pick up the custom connection manager it needs to be stored in the GAC, as well as a few other folders in the SQL Server directory.

The next step is to create a post build action that will automate this each time the project is built. Open the project properties by double clicking on Properties or by going to the Project menu and selecting “SSISRabbitMQ.RabbitMQConnectionManager Properties…”

Click Build Events and the paste the following into the Post Build event text area:

"C:\Program Files (x86)\Microsoft SDKs\Windows\v8.0A\bin\NETFX 4.0 Tools\gacutil.exe" -u $(TargetName)
"C:\Program Files (x86)\Microsoft SDKs\Windows\v8.0A\bin\NETFX 4.0 Tools\gacutil.exe" -iF $(TargetFileName)
copy $(TargetFileName) "C:\Program Files (x86)\Microsoft SQL Server\110\DTS\Connections\$(TargetFileName)" /y

copy "$(TargetDir)RabbitMQ.Client.dll" "C:\Program Files (x86)\Microsoft Visual Studio 10.0\Common7\IDE\PublicAssemblies" /y
copy "$(TargetDir)RabbitMQ.Client.dll" "C:\Program Files\Microsoft SQL Server\110\DTS\Binn" /y

The last step to setting up the project is to sign the assembly, the reason for this is because the assembly that will be built is going into the GAC. All assemblies that go into the GAC need to be signed with a strong name. See here for more details.

With the properties still open click on “Signing” .

Tick the checkbox “Sign the assembly”, select <New…”> from the “Choose  a strong name key file” drop down, and I created one called ssisrabbitmq.snk

ssis-rabbit-signing

The solution so far looks like this:

ssis-rabbit-solution

Creating the Connection Manager

Now that the solution is setup with all dependencies it is time to start on the actual connection manager class.

Start by adding a new class file “RabbitMQConnectionManager.cs”, to this class apply the DtsConnection attribute. There are three properties which will need to be set:

  • ConnectionType
  • DisplayName
  • Description

These are the properties which will appear in SQL Server Data Tools BI when a user goes to add a new connection.

The next step is to extend the ConnectionManagerBase class.

This is the code so far:

[DtsConnection(ConnectionType = "RABBITMQ",DisplayName = "RabbitMQ",
               Description = "Connection Manager for RabbitMQ")]
public class RabbitMQConnectionManager : ConnectionManagerBase
{
}

The next step is to add a number of properties which will be used for the actual connection to the RabbitMQ broker:

public string HostName { get; set; }
public string VirtualHost { get; set; }
public string UserName { get; set; }
public string Password { get; set; }
public int Port { get; set; }

For the sake of speedy development, in the constructor I’m setting the default properties for the connection.

public RabbitMQConnectionManager()
{
  HostName = "localhost";
  VirtualHost = "/";
  UserName = "guest";
  Password = "guest";
  Port = 5672;
}

Validating the Connection Manager

By overriding the Validate method, SSDT BI is able to make sure that the user has configured the connection correctly. If the method returns DTSExecResult.Failure then the package won’t begin execution.

public override Microsoft.SqlServer.Dts.Runtime.DTSExecResult Validate(Microsoft.SqlServer.Dts.Runtime.IDTSInfoEvents infoEvents)
{
  if (string.IsNullOrWhitespace(HostName))
  {
    return DTSExecResult.Failure;
  }
  else if (string.IsNullOrWhitespace(VirtualHost))
  {
    return DTSExecResult.Failure;
  }
  else if (string.IsNullOrWhitespace(UserName))
  {
    return DTSExecResult.Failure;
  }
  else if (string.IsNullOrWhitespace(Password))
  {
    return DTSExecResult.Failure;
  }
  else if (Port <= 0)
  {
    return DTSExecResult.Failure;
  }

  return DTSExecResult.Success;
}

Setting up the connection to RabbitMQ

The next two methods to override are AcquireConnection and ReleaseConnection.

These methods are called at various times during Design Time and Run Time.


public override object AcquireConnection(object txn)
{
  ConnectionFactory connFactory = new ConnectionFactory()
  {
    UserName = UserName,
    HostName = HostName,
    Password = Password,
    Port = Port,
    VirtualHost = VirtualHost
  };

  var connection = connFactory.CreateConnection();

  return connection;
}

In the AcquireConnection method we are basically just setting up the connection to RabbitMQ. (As I am aiming to be as simple as possible I’m leaving out a lot of exception handling, so beware!)

If you aren’t familiar with the RabbitMQ API for C# then it is definitely worthwhile to have a read through the user guide which can be found on the RabbitMQ website here.

The next method is ReleaseConnection:


public override void ReleaseConnection(object connection)
{
  if (connection != null)
  {
    ((IConnection)connection).Close();
  }
}

Here we are closing the connection.

The solution so far

A custom connection manager which users can use to add connections to RabbitMQ into their packages. This is obviously useless until the Source and Destination components have been added.

After building the solution and creating a new Integration Services project in SSDT BI I can now add the connection manager into a new package:

ssis-rabbitmq-adding-connection

The properties are also visible:

ssis-rabbitmq-connection-properties

That is it for now on the custom connection manager.

Stay tuned, in the next post I will be showing how to create a custom source which uses the RabbitMQConnectionManager custom user interface for the RabbitMQConnectionManager.

UPDATE:

Something that I forgot to mention, because of the post build action calling the gacutil.exe you will need to run Visual Studio as Administrator.

Building Custom SSIS Components

Introduction

I’ve been using SSIS for a long time and have always been curious about developing custom sources and destinations.

After having a bit of a look through the MDSN documentation (http://msdn.microsoft.com/en-us/library/ms345161.aspx) last night I thought I would give it a go.

So over the next couple of blog posts I’m going to attempt to create a custom Connection Manager, Source and Destination for RabbitMQ.

RabbitMQ is a widely used message broker built on the Advanced Message Queuing Protocol (AMQP).

Hopefully at the end of this series we will have:

  • Connection Manager which can be used by both the source and destination components which connects to the RabbitMQ broker
  • Source component which can read from a queue
  • Destination component which can send messages to an exchange

You can follow the source code updates in my GitHub repository.

The Series

As the series goes on I will add links to each article here.

Useful Resources

MSDN Documentation: http://msdn.microsoft.com/en-us/library/ms345161.aspx

Developing a Custom Source Component: http://msdn.microsoft.com/en-us/library/ms136088.aspx

Developing a Custom Destination Component: http://msdn.microsoft.com/en-us/library/ms135899.aspx

SQLBits 8 – SSIS Custom Components: http://sqlbits.com/Sessions/Event8/SSIS_Custom_Componenets by Dave Ballantyne (I haven’t watched this yet)