Getting Started with the Hortonworks Sandbox

In my previous post, I made reference to the Twitter Big Data example for Microsoft StreamInsight (project page).

The sample collects tweets in real-time from Twitter then does a few things:

  • Displays current information about trends in real-time on a web dashboard
  • Stores information about the tweets into a SQL Azure database
  • Store the actual tweets into an Azure Blob Store

Then there are some commands that you can use with Azure HDInsight to do some post-processing, this is great if you have access to the HDInsight Preview, but what if you are stuck on the waiting list? That’s where the Hortonworks Sandbox comes in!

In this post, I’m going to give an overview of getting the sandbox setup and then how to move the data collected by StreamInsight from Azure into the Sandbox. In the next post I will be showing how to do the analysis.

Getting Started with the Sandbox

You will need to download:

Once VirtualBox has been installed, ensure that there is a host-only network configured.

To do this, go to File -> Preferences then click “Network”. You should see an entry in the Host-only Networks:


Click on the screw-driver icon to edit the entry, and ensure that the adapter settings match the following:



Once the network is correctly configured, click OK until all the windows are closed.

The next step is to import the sandbox application. From the VirtualBox Manager, click File -> Import Applicance, this will bring up the import wizard.

On the first page click the Open appliance button, and browse to where you downloaded the sandbox. Click Next and you will see the “Appliance settings” page, you shouldn’t have to change any details.

Click Import, you will see a progress window appear. Once it has completed, you will see it in the list of servers:


Configuring the Sandbox

The next step is to configure the network adapters.

Right click on the Hortonworks Sandbox, and click Settings. Then click Network to bring up the Network Configuration.

I configured my adapters like this:

adapter-1 adapter-2

The Sandbox is now configured.

You can now boot the sandbox.
Right click in the VirtualBox Manager and click Start. This will boot up the Sandbox, after a minute or two you will see the following:


This is where I was caught out, from looking at this window you don’t know what IP address you should use to browse to the Sandbox. By configuring the host-only network, you can browse to the sandbox on

You will be asked to fill in some details about yourself then you’re good to go.

Adding the SQL Server driver for Apache Sqoop

Apache Sqoop is a tool designed for efficiently transferring bulk data between Hadoop and structured datastores such as relational databases.

In the Twitter Big Data sample, SQL Azure is used to store information about the Tweets that it collects. The Sandbox when first setup is missing the driver for SQL Server, to add it follow these steps:
  1. Find the JDBC drivers for MS SQL on Microsoft, currently sqljdbc_4.0.2206.100_enu.tar.gz
  2. Using Putty, SSH onto the sandbox and logon as root with password hadoop
  3. Issue the following command to download the driver: wget
  4. Unzip using this command: gunzip sqljdbc_4.0.2206.100_enu.tar.gz
  5. Extract tar: tar –xvf sqljdbc_4.0.2206.100_enu.tar
  6. Copy to the Sqoop lib directory: cp sqljdbc_4.0/enu/sqljdbc4.jar /usr/lib/sqoop/lib/

This now covers off all the configuration and setup of the sandbox.

Setting up the Twitter Big Data Solution

The Twitter Big Data solution can be downloaded from the codeplex site project site, by clicking on Source Code, then Download.


You will need to have StreamInsight installed and running on your development machine, installation instructions can be found here.

The Twitter Big Data solution comes with a README file which outlines various steps that you need to go through to configure the solution. You will need to fill in the app.config file with details of your SQL Azure database and Blob store. Once that is done you can run the solution and you should see data in the web dashboard, at the same time records will be going into your blob store and SQL Azure database.

Once there is some data in the SQL Azure database and the Blob Store, we can use various components of the Hortonworks Sandbox to pull data from Azure into Hadoop.

The remainder of this post is basically going to be a re-write of the instructions found in the README file of the solution.

Moving data from SQL Azure to Hadoop using Sqoop

Sqoop is a component in the sandbox which connects to different data sources and moves it into HDFS (hadoop distributed file system).

I wasn’t able to get Sqoop working with SQL Azure via the nice UI provided by the sandbox, so instead used SSH. So you will need to use Putty or any other SSH client to complete this stage.

  • Connect to the sandbox via SSH, logon using user: root, password: hadoop
  • Execute the following command:

sqoop import –connect “jdbc:sqlserver://;username=your_sql_azure_username_here@sql_azure_server_name_here;password=your_sql_azure_password_here;database=your_db_name_here” –table TweetInfo –hive-import -hive-overwrite

Sqoop will now start moving data from your SQL Azure database into the sandbox.  This shouldn’t take more than a few minutes, depending on how much data you have in the SQL Azure database and your connection speed.

Moving data from Azure Blob Storage into HDFS

The nice thing about HDInsight in Azure is that it can be configured to use Azure Storage Vault, which basically means you can map the file system of your hadoop system to Azure blob storage. Unfortunately, there is no straightforward way that I could find (if I’m wrong please let me know!) to connect the sandbox to Azure blob storage.

In the README file, the author has you map Hadoop on Azure (HDInsight) to Azure Storage Vault (ASV), because I couldn’t figure out a good way to do this, what I ended up doing was a bit clunky.

I used the Azure Storage Explorer to download all the blobs from the container, which I specified in the app.config, into a directory. I then zipped all of these files and then uploaded this zip file into the Sandbox.

Uploading the zip file to the Sandbox is very easy. With the Sandbox open in your web browser, click the File Browser icon:


Then click Upload -> Zip File:


From here select the zip file with all your tweets. This will upload the file, then extract all the files into a directory of the same name as the zip file.

Processing the Tweets using Hive

There is a script included in the solution called “Analyze Tweets with Hive.txt”. I have made some small modifications to it, which you can download here. If you compare this file to the original you will notice that the first two configuration lines have been removed as well as some of the lines which were commented out. Also, the location for the raw_tweets table has been updated.

To process this script using the Sanbox, in your browser:

  • Click on the Beeswax icon, this is the Hive UI
  • In the Settings section, add the following settings:
    • key: hive.exec.dynamic.partition value: true
    • key: hive.exec.dynamic.partition.mode value: nonstrict
  • Open the modified script, and copy and paste the contents into the query editor

Click Execute to start the script

This will then start processing the raw tweets that were stored in the Azure blob storage, which we moved into HDFS.

This will take longer to execute depending on how many tweets were collected. Unfortunately you can’t leave this unattended, as you will see the following come up:


You will need to click Next to continue processing.

Installing the Hive ODBC Driver

To analyse the tweets using Excel you will need to install the ODBC Driver from Hortonworks, which can be found here.

You will need to make sure you install the version that corresponds to the version of Excel you have installed (32bit / 64bit).

After you have installed the driver, open Administrative Tools. Then open ODBC Data Sources 32 bit / 64 bit.

Click System DNS tab, and you should see:


Click on Configure, here you will need to fill in the host and user name fields:


At this point you now have all the Tweet information that was generated by the StreamInsight application in the Sandbox, you have the ODBC Driver configured and setup.

In the next post I will show how to analyse this information.

Real-Time Analytics for Twilio

Recently I have been looking into complex event processing, what the use cases for it are and the kind of insights you can get from it.

As I am a .Net developer, I thought it best to start with the Microsoft stack, so I have been experimenting with StreamInsight. There are a few examples floating around, but the one that really inspired me was the Big Data Twitter Demo (link).

After seeing the Twitter demo I jumped into gear and built a real-time dashboard for Twilio.

For those who are not familiar with Twilio, they are a cloud based telephony company that sells telephone numbers which are powered by REST APIs. This allows developers to build complex and dynamic telephony applications (more details).

So what did I build, and why did I bother?

You can see from the screenshot below that it is a very simple real-time dashboard.


The graph shows:

  • In blue – total number of calls that started in a 5 second window
  • In black – total number of calls that ended in a 5 second window

All of this data is produced by making a telephone call to my Twilio phone number.

(If you add up the numbers then you will see that the start and end aren’t equal, this is because at the time of the screen shot a number of calls were still in progress).

The system is not complicated, but there are a number of different parts to it:

  • ASP.Net WebAPI – This is the endpoint that Twilio fires requests to when it received a phone call to my telephone number
  • RabbitMQ – When a request is made from Twilio to the WebAPI, the WebAPI will send a “start” or “end” message to RabbitMQ
  • StreamInsight application – This is the glue between the events received and the output sent to the client. It listens to the RabbitMQ queue and then with the events received does the fancy “complex event processing”
  • ASP.Net MVC Website – Which uses SignalR (a realtime framework for communication between a web client and server) to update the graph as events are processed

Looking at this stack you’re probably thinking,

“that is way too complicated to get a simple graph on a webpage!”

And you’d be right if all I wanted was a simple graph, I could just have my WebAPI broadcast messages directly to SignalR and cut out everything in between.

What StreamInsight Adds

If all I wanted was a graph showing the number of calls starting in a given window, I could probably build up the logic to do that with a bit of effort myself. However, what StreamInsight is designed for is complex event processing.

So to collect the number of events in a 5 second window all I have to do is write this query:

var callsByType = from call in rabbitMQInput
				  group call by call.EventType into groups
				  from win in groups.TumblingWindow(TimeSpan.FromSeconds(5))
				  select new CallSummary
					TotalCalls = win.Count(),
					EventType = groups.Key

To get it working end to end requires a few more components, but what I’m trying to show here is how simple it is to write queries. In later posts I’m going to explain how to write a StreamInsight application.

The example above is very trivial, so imagine that rather then just a single Twilio number you are building a system for a large telecoms company that has hundreds or thousands of phone numbers, geographically dispersed, and each number is tied to a different department and you wanted to see which departments were receiving the largest volume of calls.

Or, take another more complicated example, you want to correlate diagnostic events, against the number of failed calls since an outage.

It would be very difficult (neigh on impossible) to write this kind of logic inside a Web API controller method.

With StreamInsight it is very easy, you can do all kinds of aggregations on events. You can also join multiple event streams together. This is where the power of StreamInsight and complex event processing is.

Now I hope you’re beginning to see the necessity of all the pieces of the stack I mentioned above.

The advantage of using RabbitMQ as the input into StreamInsight means you could have events from other systems (e.g. diagnostics) all going into one system and it can all be processed by StreamInsight to find patterns and alert in real time to engineers.

Loading a Data Warehouse Dimension with RabbitMQ and SSIS

In this post I’m going to take a break from the actual development of custom components for SSIS and show how the RabbitMQ Source that I have created so far could be used in an ETL package to load a data warehouse.

The Scenario

The scenario for this post is a company that sells cool stuff over the internet, they have a website where people can signup and registered users can place orders.

The way this imaginary system works:

  • When a user signs up, their details are created in the operational database which is used by the website. After creation their details are then sent to RabbitMQ
  • When a user updates their details, they are updated in the operational database, this information is also sent to RabbitMQ
  • When a user places an order, the details are sent to two different RabbitMQ queues for processing:
    • An imaginary process writes these details into the operational database in “real-time”
    • An ETL process runs periodically to load the orders into the data warehouse

In this post I’m going to show how the customer dimension can be loaded from the RabbitMQ queue, in a future post I will show how to load the fact table via SSIS.

My ETL package will do the following:

  • Read from a queue of customers, parse the xml and populate attribute columns
  • Using the SCD (slowly changing dimension) component to populate the customer dimension

Preparing the solution

The only preparation I did before starting was to create a new database with a new table DimCustomer

CREATE TABLE [dbo].[DimCustomer](
	[CustomerKey] [int] IDENTITY(1,1) NOT NULL,
	[CustomerBusinessKey] [int] NOT NULL,
	[FirstName] [nvarchar](50) NOT NULL,
	[Surname] [nvarchar](50) NOT NULL,
	[Gender] [char](1) NOT NULL,
	[City] [nvarchar](50) NOT NULL,
	[Country] [nvarchar](50) NOT NULL,
	[MaritalStatus] [char](1) NOT NULL,
	[StartDate] [datetime] NULL,
	[EndDate] [datetime] NULL,
	[CustomerKey] ASC

Creating the ETL Package

The ETL Package is going to have a number of different steps that it goes through.

  1. Retrieve customer messages from queue
  2. Parse XML
  3. Load customers into the dimension, accounting for SCD attributes

The end result ended up looking like this:


Building the package is actually very straightforward. After creating a new solution and adding a new package, I added a new data flow task.

To the data flow task, I first added the RabbitMQ Source and configured it with a new RabbitMQ Connection manager to my local broker, I also told it to use the customer queue.

Next I added a script component, this is where the xml message from RabbitMQ will be parsed.

The script component is the key to making the messages in RabbitMQ understood in the SSIS package. What the script does is  it takes in each of the messages and then updates rows in a new output buffer. This output buffer contains columns corresponding to the elements in the xml message.

The XML messages that are published look like this:

  <city>New York</city>

To configure the Script component, first add a new output then add each of the fields from the xml above.


Next edit the script itself and updated the Input0_ProcessInputRow method:

public override void Input0_ProcessInputRow(Input0Buffer Row)
  var stringReader = new System.IO.StringReader(Row.MessageContents);
  var xmlDoc = new XmlDocument();

  Row.CustomerKey = int.Parse(xmlDoc.GetElementsByTagName("id")[0].InnerText);
  Row.FirstName = xmlDoc.GetElementsByTagName("firstname")[0].InnerText;
  Row.Gender = xmlDoc.GetElementsByTagName("gender")[0].InnerText;
  Row.Surname = xmlDoc.GetElementsByTagName("surname")[0].InnerText;
  Row.City = xmlDoc.GetElementsByTagName("city")[0].InnerText;
  Row.Country = xmlDoc.GetElementsByTagName("country")[0].InnerText;
  Row.MaritalStatus = xmlDoc.GetElementsByTagName("maritalstatus")[0].InnerText;

That is it for the script component, the next step is to drag in the Slowly Changing Dimension component. After connecting the script component to the slowly changing dimension component, double clicking will bring up the Slowly Changing Dimension Wizard.

The first page of the wizard is where you configure the connection and table of the dimension, after doing that the wizard will attempt to match the different columns in the table to columns from the input into the component. The next step on this page is to select which column is the business key.


The next page is where the SCD types are set for the columns. Historical is SCD Type 2, Changing is SCD Type 1 and Fixed is SCD Type 0.


In the next page you can configure how SSIS should deal with different scenarios, below you can see that I’ve opted not to fail the package if fixed attributes change, and ticked the second option.


The next page is used to configure how Type 2 dimensions should be stored in the dimension table, so that they can be recognized.

The wizard gives us two options:

  • Use a status column, the first wizard option, whereby a column can be selected as the indicator column then two values can be selected which correspond to the current or expired flag
  • Use start/end date columns, the second wizard option, for this option two datetime columns need to be selected and then set what value to use for the date value


The next page gives the option to enable interred member support, outside the scope of this article. I have left this turned off.


After clicking Finish, the wizard will close and the remaining elements, that you can see in the picture of the complete package above, will be added.

Running the package

To demonstrate the package, I’m going to using the RabbitMQ management interface put some messages into the queue and then run the package.

The first message I’m placing into the queue is this:

  <city>New York</city>

This is the xml sample from above. After running the package I can query the database and get the following output


The next two messages I’m putting in are:

  <city>Los Angeles</city>



The first is an update to Peter Parker, the second is a new customer. After running the package the output from the table is now this:


You can see that the package has identified that Peter Parker is an existing customer, so expires the old dimension and adds a new one, it also adds a new row for Clarke Kent.


As you can see using the RabbitMQ source itself is very easy, and thanks to some of the pre-built components in SSIS, building a package that loads dimensions into a data warehouse is very easy.

I’m going to be taking a break from blogging about Custom SSIS Components and start a new series on building a real-time monitoring system with StreamInsight.

Part 4 – Add a Custom UI to the RabbitMQ Source

A lot of the content in this article is similar to what I did in Part 2, so if you haven’t read it I suggest you go back and read it now 🙂

The next part of this series will take a look at implementing a custom UI for configuration of the RabbitMQ Source component. The main reference for this entry can be found on MSDN.

A shout out and my thanks goes to Tillmann Eitelberg (Web | Twitter) who pointed me in the right direction when I was trying to figure out how to get a list of all the connection managers in a package. Thanks!

Preparing the Project

To get my project ready I added two new files to the RabbitMQSource project. The first was an empty class file called RabbitMQSourceUI.cs and the second was a Windows Forms class called RabbitMQSourceUIForm.cs.

Then the DtsPipelineComponent attribute on the RabbitMQSource class needs to have the UITypeName property set. This indicates to SSDT BI that the custom source has a user interface for configuration. Just like in Part 2, the UITypeName property is the fully qualified type name of the RabbitMQSourceUI class. (See Part 2 for details on how the fully qualified type name is constructed).

The DtsPipelineComponent attribute, after setting the UITypeName property looks like this:

[DtsPipelineComponent(DisplayName = "RabbitMQ Source",
    ComponentType = ComponentType.SourceAdapter, Description = "Connection source for RabbitMQ",
    UITypeName = "SSISRabbitMQ.RabbitMQSource.RabbitMQSourceUI, SSISRabbitMQ.RabbitMQSource, Version=, Culture=neutral, PublicKeyToken=ac1c316408dd3955")]

Creating the Custom UI

Open the RabbitMQSourceUI file and have the class implement the IDtsComponentUI interface.

Similar to what I did for the RabbitMQ Connection Manager UI, I’m only implementing some of the methods.

The Initialize and New methods are called when the RabbitMQ Source component is first created. In the Initialize method below, I’m storing the service provider and meta data objects that were passed through, into class variables. These will then be passed through to the Form and used to update the RabbitMQ Source component as well as the package.

public void Initialize(IDTSComponentMetaData100 dtsComponentMetadata, IServiceProvider serviceProvider)
  this.serviceProvider = serviceProvider;
  this.metaData = dtsComponentMetadata;

  this.connectionService = (IDtsConnectionService)serviceProvider.GetService(typeof(IDtsConnectionService));

The New method is called right after Initialize and it just displays the form.

public void New(IWin32Window parentWindow)

The ShowForm method is used by both the New and Edit methods.

private DialogResult ShowForm(IWin32Window window)
  RabbitMQSourceUIForm form = new RabbitMQSourceUIForm(this.metaData, this.serviceProvider);

  return form.ShowDialog(window);

The Edit method is called when the user double clicks on the component, or right clicks and selects “Edit”.

public bool Edit(IWin32Window parentWindow, DTSRuntime.Variables variables, DTSRuntime.Connections connections)
  return ShowForm(parentWindow);

The next step is to design the user interface. After adding various UI elements to the RabbitMQUIForm, I ended up with a design that looks like this:


All the code is available on GitHub if you’re interested in seeing how the Form was put together.

The next step is to add some methods in the form class.

In the constructor I am storing the service provider and meta data that were passed through, as well as getting an instance of the IDtsConnectionService (line 6) and an instance of CManagedComponentWrapper (line 7).

public RabbitMQSourceUIForm(Microsoft.SqlServer.Dts.Pipeline.Wrapper.IDTSComponentMetaData100 metaData, IServiceProvider serviceProvider)
  : this()
  this.metaData = metaData;
  this.serviceProvider = serviceProvider;
  this.connectionService = (IDtsConnectionService)serviceProvider.GetService(typeof(IDtsConnectionService));
  this.designTimeInstance = metaData.Instantiate();

The connection manager service exposes the connection managers that have already been created in the package, it also provides a number of methods to create new connection managers.

In the RabbitMQSourceUI_Load method below, I am using the connection manager service to retrieve a list of all connection managers, check if they are RabbitMQConnectionManagers then populating the connections combo box. This will let the user select which connection manager to use for the source.

It will also preselect the connection manager currently assigned to the source if there is one and sets the queue name textbox value to the QueueName property if it has been set. This is useful when editing the source.

private void RabbitMQSourceUIForm_Load(object sender, EventArgs e)
  var connections = connectionService.GetConnections();

  var queueName = metaData.CustomPropertyCollection[0];
  txtQueueName.Text = queueName.Value;

  string connectionManagerId = string.Empty;

  var currentConnectionManager = this.metaData.RuntimeConnectionCollection[0];
  if (currentConnectionManager != null)
    connectionManagerId = currentConnectionManager.ConnectionManagerID;

  for (int i = 0; i < connections.Count; i++)
    var conn = connections[i].InnerObject as RabbitMQConnectionManager.RabbitMQConnectionManager;

    if (conn != null)
      var item = new ConnectionManagerItem()
        Name = connections[i].Name,
        ConnManager = conn,
        ID = connections[i].ID

      if (connections[i].ID.Equals(connectionManagerId))
        cbConnectionList.SelectedIndex = i;

ConnectionManagerItem is a custom class which I created to populate the combo box – see GitHub source for details.

When the user clicks the OK button two things need to happen, first the Queue Name property needs to be set to what is on the form, and second the selected connection manager needs to be associated with the connection manager defined in the component.

private void btnOK_Click(object sender, EventArgs e)
  if (!string.IsNullOrWhiteSpace(txtQueueName.Text))
    designTimeInstance.SetComponentProperty("QueueName", txtQueueName.Text);

  if (cbConnectionList.SelectedItem != null)
    var item = (ConnectionManagerItem)cbConnectionList.SelectedItem;
    this.metaData.RuntimeConnectionCollection[0].ConnectionManagerID = item.ID;

  this.DialogResult = System.Windows.Forms.DialogResult.OK;

If you read part 3, then you will hopefully remember what I said about having to hard code the ConnectionManagerID property in the ProvideComponentProperties method, this snippet of code above now fixes this issue of hard code values 🙂

Having used SSIS for sometime now, I appreciate that there is a New button on most source components to create a new connection manager, this streamlines the process of creating sources. So I thought I should also provide the users of my component with the same feature.

In the method below you can see that I am using the connection service to create a new connection, the type is RABBITMQ and this matches up with ConnectionType in the DtsConnection attribute on the RabbiMQConnectionManager class that was created in part 1. What is really nice is that since I implemented a custom UI to configure the RabbitMQ connection manager, it is displayed to the user as soon as they click the New button on the form. This is all done by SSDT BI when the CreateConnection method on the connection service is called. So easy.

The MSDN documentation says it returns an ArrayList which holds all the connections that were created, if the user cancels the creation then it will return an empty list. As far a I can tell it should only return one item in the list.

private void btnNewConnectionManager_Click(object sender, EventArgs e)
  System.Collections.ArrayList created = connectionService.CreateConnection("RABBITMQ");

  foreach (ConnectionManager cm in created)
    var item = new ConnectionManagerItem()
      Name = cm.Name,
      ConnManager = cm.InnerObject as RabbitMQConnectionManager.RabbitMQConnectionManager,
      ID = cm.ID

    cbConnectionList.Items.Insert(0, item);

Adding an Icon

In the screenshots below you will see an icon on the RabbitMQ Source component. To get the icon to show up, I added an ico file to my project then set the Build Action to “Embedded Resource”.

The only other thing left to do is update the DtsPipelineComponent attribute on the RabbitMQSource class. After setting the IconResource property this now looks like:

  [DtsPipelineComponent(IconResource = "SSISRabbitMQ.RabbitMQSource.Rabbit.ico",
    DisplayName = "RabbitMQ Source",
    ComponentType = ComponentType.SourceAdapter,
    Description = "Connection source for RabbitMQ",
    UITypeName = "SSISRabbitMQ.RabbitMQSource.RabbitMQSourceUI, SSISRabbitMQ.RabbitMQSource, Version=, Culture=neutral, PublicKeyToken=ac1c316408dd3955")]

What it looks like

Now time to see how this makes configuration a much easier experience!

The below screenshot shows what the user see when they first add a RabbitMQ Source to a data flow task, the dialog box is displayed straightaway.


After clicking the New button, the dialog box created in part 2 is displayed for the user to setup the RabbitMQ Connection Manager


After clicking OK for the connection manager to be created you see that it appears in the drop down.


Finally, after clicking OK for the source to be created you can see that the source has passed validation, and that a RabbitMQ Connection Manager has been created – note that this no longer needs to be called “RabbitMQ”.


Up next I’m going to take a short break from the development side of things and show how the RabbitMQ Source that has so far been developed could be used to load a fictional data warehouse.

Part 3 – Building a Custom Source Component

So far in this series I have shown how to create a custom connection manager and also how to create a user friendly interface to configure and test it. In this post I’m going to outline the steps I went through to create a custom source component for SSIS. By the end of this post I will show messages that are in a RabbitMQ queue being consumed by an SSIS package.

Preparing the Solution

The first step is to create a new class library project in the solution. I called mine SSISRabbitMQ.RabbitMQSource. Then add references to the following assemblies:

  • Microsoft.SqlServer.Dts.Design
  • Microsoft.SqlServer.DTSPipelineWrap
  • Microsoft.SqlServer.DTSRuntimeWrap
  • Microsoft.SqlServer.ManagedDTS
  • Microsoft.SqlServer.PipelineHost

These can be found  in C:\Program Files (x86)\Microsoft SQL Server\110\SDK\Assemblies

Then using Part 1 as a reference perform the following steps:

  • Add the RabbitMQ client library via NuGet
  • Add a post build step to copy the output assembly into the GAC

Then add a new class file called RabbitMQSource.cs into the project.

Creating the Custom Source

To have SSDT BI recognize the RabbitMQSource class as a source component, two things need to be done:

  • Apply the DtsPipelineComponent attribute
  • Extend and override methods from the PipelineComponent class (MSDN documentation)

The DtsPipelineComponent attribute is similar to the DtsConnection attribute which was applied to the custom connection manager in the first post. This attribute signals to SSDT BI that it is a custom component.

There are a number of properties that need to be set:

  • DisplayName – this is what will be displayed in the SSIS Toolbox in SSDT BI
  • Description – this will also be displayed in the SSIS Toolbox
  • ComponentType – this tells SSDT BI what kind of component the class is, (Source, Destination or Transformation)

After applying the attribute and extending the PipelineComponent class so far I have this code:

[DtsPipelineComponent(DisplayName = "RabbitMQ Source",
  ComponentType = ComponentType.SourceAdapter,
  Description = "Connection source for RabbitMQ")]
public class RabbitMQSource : PipelineComponent

Implementing the Custom Source

The first method to override is ProvideComponentProperties. This is the method which is called when the source is added to a package. In here goes all the setup for the source component. In the code below you can see I’m doing three things:

  1. Creating a new output
  2. Creating a custom property for the Queue name
  3. Creating a connection
public override void ProvideComponentProperties()
  // Reset the component.

  IDTSOutput100 output = ComponentMetaData.OutputCollection.New();
  output.Name = "Output";

  IDTSCustomProperty100 queueName = ComponentMetaData.CustomPropertyCollection.New();
  queueName.Name = "QueueName";
  queueName.Description = "The name of the RabbitMQ queue to read messages from";

  IDTSRuntimeConnection100 connection = ComponentMetaData.RuntimeConnectionCollection.New();
  connection.Name = "RabbitMQ";
  connection.ConnectionManagerID = "RabbitMQ";


Very important and something that took me a little while to figure out was the line:

connection.ConnectionManagerID = “RabbitMQ”;

Alot of other tutorials that I read online left this out, however I found that without it my source didn’t work.

Basically, this line is saying “In the package there needs to be a connection called RabbitMQ”, it will also need to be a RabbitMQConnectionManager which will become obvious in the AcquireConnections method below.

(In the next post I will be showing how to create a custom user interface for this source which will allow the user to select the appropriate connection manager, which will avoid the need to hard coding this value!)

The CreateColumns method looks like this:

private void CreateColumns()
  IDTSOutput100 output = ComponentMetaData.OutputCollection[0];


  IDTSOutputColumn100 column1 = output.OutputColumnCollection.New();
  IDTSExternalMetadataColumn100 exColumn1 = output.ExternalMetadataColumnCollection.New();

  IDTSOutputColumn100 column2 = output.OutputColumnCollection.New();
  IDTSExternalMetadataColumn100 exColumn2 = output.ExternalMetadataColumnCollection.New();

  column1.Name = "MessageContents";
  column1.SetDataTypeProperties(DataType.DT_WSTR, 4000, 0, 0, 0);

  column2.Name = "RoutingKey";
  column2.SetDataTypeProperties(DataType.DT_WSTR, 100, 0, 0, 0);

In here I am setting up the Output, adding two columns. The first will contain the message contents, and the second column will contain the routing key that was used to publish the message in RabbitMQ.

Connecting to External Data Sources

As the source will be making use of a connection manager there are two methods to override, these are the AcquireConnections and ReleaseConnections. Both the AcquireConnections and ReleaseConnections methods are called during design time for validation purposes and at runtime to establish a connection for use during package execution.

The AcquireConnections method will look for the ConnectionManager object on the connection that was created in the ProvideComponentProperties method. In the method below, once it has a reference to the connection manager (line 5 and 6) it checks that what has been returned is a RabbitMQConnectionManager (line 8 and 10). It then retrieves the queue name that was set on the source component and then calls the AcquireConnection on the connection manager itself. This will attempt to establish a connection to the RabbitMQ broker, which will return an IConnection. (For more details on this check out the first post.) It then stores the IConnection which was returned so it can be used later on in the component, as well as used in the ReleaseConnections method.

public override void AcquireConnections(object transaction)
  if (ComponentMetaData.RuntimeConnectionCollection[0].ConnectionManager != null)
    ConnectionManager connectionManager = Microsoft.SqlServer.Dts.Runtime.DtsConvert.GetWrapper(

    this.rabbitMqConnectionManager = connectionManager.InnerObject as RabbitMQConnectionManager.RabbitMQConnectionManager;

    if (this.rabbitMqConnectionManager == null)
      throw new Exception("Couldn't get the RabbitMQ connection manager, ");

this.queueName = ComponentMetaData.CustomPropertyCollection["QueueName"].Value;
 rabbitConnection = this.rabbitMqConnectionManager.AcquireConnection(transaction) as IConnection;

The next method to override is the ReleaseConnections, whose purpose it is to do any cleanup on any connections created by the AcquireConnections method. All I am doing below is checking that the rabbitMqConnectionManager has been set, then passing the rabbitConnection variable into it to be released, this is the IConnection which was returned in the AcquireConnections method.

public override void ReleaseConnections()
  if (rabbitMqConnectionManager != null)

Validating the Source

To ensure that the source has been correctly configured by the user, I have implementation the Validate method. All this does is check if the user has set the queue name. For more details on the Validate method see the MSDN documentation.

public override DTSValidationStatus Validate()
  bool cancel;
  string qName = ComponentMetaData.CustomPropertyCollection["QueueName"].Value;

  if (string.IsNullOrWhiteSpace(qName))
    //Validate that the QueueName property is set
    ComponentMetaData.FireError(0, ComponentMetaData.Name, "The QueueName property must be set", "", 0, out cancel);
    return DTSValidationStatus.VS_ISBROKEN;

  return base.Validate();

Getting messages out of RabbitMQ and into SSIS

Now that all the dull setup bits are out of the way, it is time for the interesting part!

There are a number of methods that are executed as part of the SSIS Pipeline, in this section I will be showing the PreExecute and PrimeOutput methods. The sequence of execution is:

  1. PrepareForExecute
  2. PreExecute
  3. PrimeOutput for a source and ProcessInput for a destination/transformation
  4. PostExecute
  5. Cleanup

During execution the PreExecute method is called once, therefore the MSDN documentation recommends that you place as much logic in here as possible. In the PreExecute method below, I am creating the channel and consumer that will be used in the PrimeOutput method to retrieve messages from the RabbitMQ queue.

public override void PreExecute()
    this.consumerChannel = rabbitConnection.CreateModel();
    this.consumerChannel.QueueDeclare(queueName, true, false, false, null);
    this.queueConsumer = new QueueingBasicConsumer(this.consumerChannel);
    this.consumerTag = consumerChannel.BasicConsume(queueName, true, queueConsumer);
  catch (Exception)

The PrimeOutput method is called after PreExecute, it is in here that rows are added to the output buffer. The number of output buffers in the buffers array is determined by the IDTSOutputCollection100. In the ProvideComponentProperties method at the beginning of this post I am adding a single output to this collection.

In the PrimeOutput method below, you can see that the component will continually retrieve messages from the RabbitMQ queue until there are none left.

Then for each message that it retrieves it calls the AddRow method on the buffer, this causes a new row to be created, it then sets the message contents and the routing key values of the message on the row.

Finally, the SetEndOfRowset method is called to indicate to the source that the component has finished adding rows.

public override void PrimeOutput(int outputs, int[] outputIDs, PipelineBuffer[] buffers)
  IDTSOutput100 output = ComponentMetaData.OutputCollection[0];
  PipelineBuffer buffer = buffers[0];

  object message;
  bool success;

  while (queueConsumer.IsRunning)
      success = queueConsumer.Queue.Dequeue(100, out message);
    catch (Exception)

    if (success)
      BasicDeliverEventArgs e = (BasicDeliverEventArgs)message;

      var messageContent = System.Text.Encoding.UTF8.GetString(e.Body);

      buffer[0] = messageContent;
      buffer[1] = e.RoutingKey;


That is enough to start getting messages out of RabbitMQ and into the SSIS package.

Cleaning Up

The last stage is to clean up after ourselves. In the Cleanup method below I am cancelling the consumer that was created in the PreExecute method and then closing the channel.

public override void Cleanup()
  if (consumerChannel.IsOpen)
    if (queueConsumer.IsRunning)

The ReleaseConnections method will be called by SSIS to release the connection to RabbitMQ.

What it looks like

It is now time to run the package.

Below you can see the data flow tasks highlighted in the middle of the screen, to the right is the QueueName property and down the bottom is the RabbitMQ Connection Manager.


Below you can see that the RabbitMQ Source has dequeued 2 messages, which are displayed in the data viewer, it has a tick indicating that it has finished dequeuing messages (this is correct because I only put two messages into the queue).


Up next I will be building a custom interface for configuring the RabbitMQ Source.

Part 2 – Adding a Custom UI to the Connection Manager

In my previous post I explained how to create a custom connection manager for SSIS to connect to RabbitMQ.

In this post I’m going to extend this further by adding a custom UI where the connection can be configured and tested.

Preparing the Project

The first step is to add a few more references:

  • Microsoft.SqlServer.Dts.Design
  • Microsoft.SqlServer.DTSPipelineWrap
  • Microsoft.SqlServer.DTSRuntimeWrap
  • Microsoft.SqlServer.PipelineHost

These assemblies can be found in C:\Program Files (x86)\Microsoft SQL Server\110\SDK\Assemblies

The next step is to add two new files, the first is an empty class file which I’ve called RabbitMQConnectionManagerUI.cs the second is a Windows Forms class which I’ve called RabbitMQConnectionManagerUIForm.cs

For SSDT BI to know that there is a custom user interface associated with this connection manager the DtsConnection attribute on the RabbitMQConnectionManager class will need to be updated, by setting the UITypeName property.

The UITypeName refers to the fully qualified type name of the class added above, in the next section I’ll be showing the interface that this class will need to implement. The fully qualified type name looks like this:

SSISRabbitMQ.RabbitMQConnectionManager.RabbitMQConnectionManagerUI, SSISRabbitMQ.RabbitMQConnectionManager, Version=, Culture=neutral, PublicKeyToken=abc123...

The PublicKeyToken can be tricky to get if you’ve never had to find it before. To get the PublicKeyToken of your assembly, open the Visual Studio Tools Command Prompt.

Browse to where the assembly is and type the following:

sn.exe -T SSISRabbitMQ.RabbitMQConnectionManager.dll

This will output the Public key token, like this:

The DtsConnection attribute now looks like this:

[DtsConnection(ConnectionType = "RABBITMQ",
    DisplayName = "RabbitMQ Connection Manager",
    Description = "Connection manager for RabbitMQ",
    UITypeName = "SSISRabbitMQ.RabbitMQConnectionManager.RabbitMQConnectionManagerUI, SSISRabbitMQ.RabbitMQConnectionManager, Version=, Culture=neutral, PublicKeyToken=ac1c316408dd3955")]

Creating the Custom UI

So at this point the project has been setup with all the required classes and the DtsConnection attribute has been updated so SSDT BI will know that there is a custom UI to configure the connection. Now the fun begins.

The first step is to open the RabbitMQConnectionManagerUI class file and have it implement the IDtsConnectionManagerUI interface.

public class RabbitMQConnectionManagerUI : IDtsConnectionManagerUI
  public void Delete(IWin32Window parentWindow)
    throw new NotImplementedException();

  public bool Edit(IWin32Window parentWindow, Connections connections, ConnectionManagerUIArgs connectionUIArg)
    throw new NotImplementedException();

  public void Initialize(ConnectionManager connectionManager, IServiceProvider serviceProvider)
    throw new NotImplementedException();

  public bool New(IWin32Window parentWindow, Connections connections, ConnectionManagerUIArgs connectionUIArg)
    throw new NotImplementedException();

After Visual Studio has created all the stubs, we can see there are four methods which need to be implemented, I will come back to these after creating the Windows Form class.

Next I opened the RabbitMQConnectionManagerUIForm and added a bunch of UI components so it ended up looking like this:


(All the code for the Form will be available in my GitHub repository so I’m only going to cover off the important/interesting parts.)

The next step was to add a new constructor to the Windows Forms class so that some important parameters can be passed through from the RabbitMQConnectionManagerUI, this will be the class which instantiates the Form.

public RabbitMQConnectionManagerUIForm(Microsoft.SqlServer.Dts.Runtime.ConnectionManager connectionManager, IServiceProvider serviceProvider)
  : this()
  this.connectionManager = connectionManager;
  this.serviceProvider = serviceProvider;


The connection manager that is being passed in is what the Form is actually going to be modifying.

The SetFormValuesFromConnectionManager method updates the Form each time it is loaded with the currently set values for the Connection Manager.

private void SetFormValuesFromConnectionManager()
  string hostname = connectionManager.Properties["HostName"].GetValue(connectionManager).ToString();
  string username = connectionManager.Properties["UserName"].GetValue(connectionManager).ToString();
  string password = connectionManager.Properties["Password"].GetValue(connectionManager).ToString();
  string virtualhost = connectionManager.Properties["VirtualHost"].GetValue(connectionManager).ToString();
  string port = connectionManager.Properties["Port"].GetValue(connectionManager).ToString();

  if (!string.IsNullOrWhiteSpace(hostname))
    txtHost.Text = hostname;
  if (!string.IsNullOrWhiteSpace(username))
    txtUserName.Text = username;
  if (!string.IsNullOrWhiteSpace(password))
    txtPassword.Text = password;
  if (!string.IsNullOrWhiteSpace(virtualhost))
    txtVirtualHost.Text = virtualhost;
  if (!string.IsNullOrWhiteSpace(port))
    nmPort.Text = port;

One of the important parts of the RabbitMQConnectionManagerUIForm class is what happens when the user clicks the OK button. In the event handler below I’m setting the DialogResult to be DialogResult.OK, this is so that the calling class knows the user has accepted the changes. As well as this there is also a call to the UpdateConnectionFromControls method, this method will go through each property in the connection manager and update them to what has been set in the Form.

private void btnOK_Click(object sender, EventArgs e)

  this.DialogResult = DialogResult.OK;

The UpdateConnectionFromControls method looks like this:

private void UpdateConnectionFromControls()
  int port = Convert.ToInt32(nmPort.Value);

  connectionManager.Properties["HostName"].SetValue(connectionManager, txtHost.Text);
  connectionManager.Properties["UserName"].SetValue(connectionManager, txtUserName.Text);
  connectionManager.Properties["Password"].SetValue(connectionManager, txtPassword.Text);
  connectionManager.Properties["VirtualHost"].SetValue(connectionManager, txtVirtualHost.Text);
  connectionManager.Properties["Port"].SetValue(connectionManager, port);

The code for the Test Connection button attempts to create a connection to the RabbitMQ broker using the values specified in the form.

private void btnTestConnection_Click(object sender, EventArgs e)
  IConnection connection = null;

    ConnectionFactory rabbitMqConnectionFactory = new ConnectionFactory()
      HostName = txtHost.Text,
      VirtualHost = txtVirtualHost.Text,
      UserName = txtUserName.Text,
      Password = txtPassword.Text,
      Port = Convert.ToInt32(nmPort.Value)

    connection = rabbitMqConnectionFactory.CreateConnection();

    if (connection != null && connection.IsOpen)
      MessageBox.Show("Test connection verified", "RabbitMQ Connection Manager", MessageBoxButtons.OK,        MessageBoxIcon.Information);
      MessageBox.Show("Test connection failed", "RabbitMQ Connection Manager", MessageBoxButtons.OK, MessageBoxIcon.Error);

  catch (Exception ex)
    MessageBox.Show("Test connection failed!" + Environment.NewLine + ex.Message, "RabbitMQ Connection Manager", MessageBoxButtons.OK, MessageBoxIcon.Error);

    if (connection != null && connection.IsOpen)
    else if (connection != null)

This is all I’m going to cover in terms of the Form, for the full class check it out on GitHub.

Now it is time to go back to the RabbitMQConnectionManagerUI. At the moment this class has four methods that need to be filled in:

  • Initialize
  • New
  • Edit
  • Delete

The Initialize method is very straightforward, I am simply just setting a reference in my class to the Connection Manager and Service Provider that was passed through to it. This is so they can then be passed onto the Form class when it is being instantiated, as you will see below.

public void Initialize(ConnectionManager connectionManager, IServiceProvider serviceProvider)
  this.connectionManager = connectionManager;
  this.serviceProvider = serviceProvider;

The New method is called by SSDT BI after the Initialize method, when the user is creating a new Connection Manager. It is suggested by the MSDN documentation that in here the Form is displayed for editing.

public bool New(IWin32Window parentWindow, Connections connections, ConnectionManagerUIArgs connectionUIArg)
  IDtsClipboardService clipboardService;

  clipboardService = (IDtsClipboardService)serviceProvider.GetService(typeof(IDtsClipboardService));
  if (clipboardService != null)
  // If connection manager has been copied and pasted, take no action.
    if (clipboardService.IsPasteActive)
      return true;

  return EditConnection(parentWindow);

The EditConnection method is used by both the New and Edit methods defined on the interface and is used to launch the Form:

private bool EditConnection(IWin32Window parentWindow)
  RabbitMQConnectionManagerUIForm frm = new RabbitMQConnectionManagerUIForm(connectionManager, serviceProvider);

  var result = frm.ShowDialog();

  if (result == DialogResult.OK)
    return true;
    return false;

In the Edit method I am simply just displaying the Form using the handy helper method from above, so the user can edit the connection.

public bool Edit(IWin32Window parentWindow, Connections connections, ConnectionManagerUIArgs connectionUIArg)
  return EditConnection(parentWindow);

The Delete method is for any cleanup that is required when the Connection Manager is delete from the package, as we don’t need to perform any cleanup the method is very simple:

public void Delete(IWin32Window parentWindow)

What it looks like

At this point I can now build the project and open SSDT BI to see the result.


Stay Tuned! In the next post I will show how to create a Custom Source which uses the connection manager to read messages from a RabbitMQ queue!

Part 1 – Building a Custom Connection Manager

Following on from my introductory post, here I will be explaining how to develop a Custom Connection Manager for RabbitMQ. This connection manager will then be used in future posts by a custom Source and Destination component.

What is a Connection Manager for?

From the MSDN documentation:

“Integration Services uses connection managers to encapsulate the information needed to connect to an external data source. […] If the connection managers and external data sources supported by Integration Services do not entirely meet your requirements, you can create a custom connection manager.”

By the sounds of it, creating a Connection Manager should be pretty straightforward:

  1. To create a custom connection manager, you have to create a class that inherits from the ConnectionManagerBase base class,
  2. Apply the DtsConnectionAttributeattribute to your new class,
  3. Override the important methods and properties of the base class, including the ConnectionString property and the AcquireConnection method.


All of these examples will be designed against the SDK for SQL Server 2012. You will need to ensure you installed the Client SDK during the installation of SQL Server.

Setting up the solution

I created a new solution and added a new class library projects to it called SSISRabbitMQ.RabbitMQConnectionManager.

To this project I then added the RabbitMQ client library via NuGet:

Adding RabbitMQ client via NuGet

The next step is to add a reference to the Microsoft.SqlServer.ManagedDTS.dll, from the documentation:

“The Microsoft.SqlServer.Dts.Runtime namespace contains the classes and interfaces to create packages, custom tasks, and other package control flow elements.”

This assembly can be found in C:\Program Files (x86)\Microsoft SQL Server\110\SDK\Assemblies


There are a few other assemblies in this directory that will be used in later stages.

For SSDT BI to pick up the custom connection manager it needs to be stored in the GAC, as well as a few other folders in the SQL Server directory.

The next step is to create a post build action that will automate this each time the project is built. Open the project properties by double clicking on Properties or by going to the Project menu and selecting “SSISRabbitMQ.RabbitMQConnectionManager Properties…”

Click Build Events and the paste the following into the Post Build event text area:

"C:\Program Files (x86)\Microsoft SDKs\Windows\v8.0A\bin\NETFX 4.0 Tools\gacutil.exe" -u $(TargetName)
"C:\Program Files (x86)\Microsoft SDKs\Windows\v8.0A\bin\NETFX 4.0 Tools\gacutil.exe" -iF $(TargetFileName)
copy $(TargetFileName) "C:\Program Files (x86)\Microsoft SQL Server\110\DTS\Connections\$(TargetFileName)" /y

copy "$(TargetDir)RabbitMQ.Client.dll" "C:\Program Files (x86)\Microsoft Visual Studio 10.0\Common7\IDE\PublicAssemblies" /y
copy "$(TargetDir)RabbitMQ.Client.dll" "C:\Program Files\Microsoft SQL Server\110\DTS\Binn" /y

The last step to setting up the project is to sign the assembly, the reason for this is because the assembly that will be built is going into the GAC. All assemblies that go into the GAC need to be signed with a strong name. See here for more details.

With the properties still open click on “Signing” .

Tick the checkbox “Sign the assembly”, select <New…”> from the “Choose  a strong name key file” drop down, and I created one called ssisrabbitmq.snk


The solution so far looks like this:


Creating the Connection Manager

Now that the solution is setup with all dependencies it is time to start on the actual connection manager class.

Start by adding a new class file “RabbitMQConnectionManager.cs”, to this class apply the DtsConnection attribute. There are three properties which will need to be set:

  • ConnectionType
  • DisplayName
  • Description

These are the properties which will appear in SQL Server Data Tools BI when a user goes to add a new connection.

The next step is to extend the ConnectionManagerBase class.

This is the code so far:

[DtsConnection(ConnectionType = "RABBITMQ",DisplayName = "RabbitMQ",
               Description = "Connection Manager for RabbitMQ")]
public class RabbitMQConnectionManager : ConnectionManagerBase

The next step is to add a number of properties which will be used for the actual connection to the RabbitMQ broker:

public string HostName { get; set; }
public string VirtualHost { get; set; }
public string UserName { get; set; }
public string Password { get; set; }
public int Port { get; set; }

For the sake of speedy development, in the constructor I’m setting the default properties for the connection.

public RabbitMQConnectionManager()
  HostName = "localhost";
  VirtualHost = "/";
  UserName = "guest";
  Password = "guest";
  Port = 5672;

Validating the Connection Manager

By overriding the Validate method, SSDT BI is able to make sure that the user has configured the connection correctly. If the method returns DTSExecResult.Failure then the package won’t begin execution.

public override Microsoft.SqlServer.Dts.Runtime.DTSExecResult Validate(Microsoft.SqlServer.Dts.Runtime.IDTSInfoEvents infoEvents)
  if (string.IsNullOrWhitespace(HostName))
    return DTSExecResult.Failure;
  else if (string.IsNullOrWhitespace(VirtualHost))
    return DTSExecResult.Failure;
  else if (string.IsNullOrWhitespace(UserName))
    return DTSExecResult.Failure;
  else if (string.IsNullOrWhitespace(Password))
    return DTSExecResult.Failure;
  else if (Port <= 0)
    return DTSExecResult.Failure;

  return DTSExecResult.Success;

Setting up the connection to RabbitMQ

The next two methods to override are AcquireConnection and ReleaseConnection.

These methods are called at various times during Design Time and Run Time.

public override object AcquireConnection(object txn)
  ConnectionFactory connFactory = new ConnectionFactory()
    UserName = UserName,
    HostName = HostName,
    Password = Password,
    Port = Port,
    VirtualHost = VirtualHost

  var connection = connFactory.CreateConnection();

  return connection;

In the AcquireConnection method we are basically just setting up the connection to RabbitMQ. (As I am aiming to be as simple as possible I’m leaving out a lot of exception handling, so beware!)

If you aren’t familiar with the RabbitMQ API for C# then it is definitely worthwhile to have a read through the user guide which can be found on the RabbitMQ website here.

The next method is ReleaseConnection:

public override void ReleaseConnection(object connection)
  if (connection != null)

Here we are closing the connection.

The solution so far

A custom connection manager which users can use to add connections to RabbitMQ into their packages. This is obviously useless until the Source and Destination components have been added.

After building the solution and creating a new Integration Services project in SSDT BI I can now add the connection manager into a new package:


The properties are also visible:


That is it for now on the custom connection manager.

Stay tuned, in the next post I will be showing how to create a custom source which uses the RabbitMQConnectionManager custom user interface for the RabbitMQConnectionManager.


Something that I forgot to mention, because of the post build action calling the gacutil.exe you will need to run Visual Studio as Administrator.