Part 3 – Building a Custom Source Component

So far in this series I have shown how to create a custom connection manager and also how to create a user friendly interface to configure and test it. In this post I’m going to outline the steps I went through to create a custom source component for SSIS. By the end of this post I will show messages that are in a RabbitMQ queue being consumed by an SSIS package.

Preparing the Solution

The first step is to create a new class library project in the solution. I called mine SSISRabbitMQ.RabbitMQSource. Then add references to the following assemblies:

  • Microsoft.SqlServer.Dts.Design
  • Microsoft.SqlServer.DTSPipelineWrap
  • Microsoft.SqlServer.DTSRuntimeWrap
  • Microsoft.SqlServer.ManagedDTS
  • Microsoft.SqlServer.PipelineHost

These can be found  in C:\Program Files (x86)\Microsoft SQL Server\110\SDK\Assemblies

Then using Part 1 as a reference perform the following steps:

  • Add the RabbitMQ client library via NuGet
  • Add a post build step to copy the output assembly into the GAC

Then add a new class file called RabbitMQSource.cs into the project.

Creating the Custom Source

To have SSDT BI recognize the RabbitMQSource class as a source component, two things need to be done:

  • Apply the DtsPipelineComponent attribute
  • Extend and override methods from the PipelineComponent class (MSDN documentation)

The DtsPipelineComponent attribute is similar to the DtsConnection attribute which was applied to the custom connection manager in the first post. This attribute signals to SSDT BI that it is a custom component.

There are a number of properties that need to be set:

  • DisplayName – this is what will be displayed in the SSIS Toolbox in SSDT BI
  • Description – this will also be displayed in the SSIS Toolbox
  • ComponentType – this tells SSDT BI what kind of component the class is, (Source, Destination or Transformation)

After applying the attribute and extending the PipelineComponent class so far I have this code:

[DtsPipelineComponent(DisplayName = "RabbitMQ Source",
  ComponentType = ComponentType.SourceAdapter,
  Description = "Connection source for RabbitMQ")]
public class RabbitMQSource : PipelineComponent
{
}

Implementing the Custom Source

The first method to override is ProvideComponentProperties. This is the method which is called when the source is added to a package. In here goes all the setup for the source component. In the code below you can see I’m doing three things:

  1. Creating a new output
  2. Creating a custom property for the Queue name
  3. Creating a connection
public override void ProvideComponentProperties()
{
  // Reset the component.
  base.RemoveAllInputsOutputsAndCustomProperties();
  ComponentMetaData.RuntimeConnectionCollection.RemoveAll();

  IDTSOutput100 output = ComponentMetaData.OutputCollection.New();
  output.Name = "Output";

  IDTSCustomProperty100 queueName = ComponentMetaData.CustomPropertyCollection.New();
  queueName.Name = "QueueName";
  queueName.Description = "The name of the RabbitMQ queue to read messages from";

  IDTSRuntimeConnection100 connection = ComponentMetaData.RuntimeConnectionCollection.New();
  connection.Name = "RabbitMQ";
  connection.ConnectionManagerID = "RabbitMQ";

  CreateColumns();
}

Very important and something that took me a little while to figure out was the line:

connection.ConnectionManagerID = “RabbitMQ”;

Alot of other tutorials that I read online left this out, however I found that without it my source didn’t work.

Basically, this line is saying “In the package there needs to be a connection called RabbitMQ”, it will also need to be a RabbitMQConnectionManager which will become obvious in the AcquireConnections method below.

(In the next post I will be showing how to create a custom user interface for this source which will allow the user to select the appropriate connection manager, which will avoid the need to hard coding this value!)

The CreateColumns method looks like this:

private void CreateColumns()
{
  IDTSOutput100 output = ComponentMetaData.OutputCollection[0];

  output.OutputColumnCollection.RemoveAll();
  output.ExternalMetadataColumnCollection.RemoveAll();

  IDTSOutputColumn100 column1 = output.OutputColumnCollection.New();
  IDTSExternalMetadataColumn100 exColumn1 = output.ExternalMetadataColumnCollection.New();

  IDTSOutputColumn100 column2 = output.OutputColumnCollection.New();
  IDTSExternalMetadataColumn100 exColumn2 = output.ExternalMetadataColumnCollection.New();

  column1.Name = "MessageContents";
  column1.SetDataTypeProperties(DataType.DT_WSTR, 4000, 0, 0, 0);

  column2.Name = "RoutingKey";
  column2.SetDataTypeProperties(DataType.DT_WSTR, 100, 0, 0, 0);
}

In here I am setting up the Output, adding two columns. The first will contain the message contents, and the second column will contain the routing key that was used to publish the message in RabbitMQ.

Connecting to External Data Sources

As the source will be making use of a connection manager there are two methods to override, these are the AcquireConnections and ReleaseConnections. Both the AcquireConnections and ReleaseConnections methods are called during design time for validation purposes and at runtime to establish a connection for use during package execution.

The AcquireConnections method will look for the ConnectionManager object on the connection that was created in the ProvideComponentProperties method. In the method below, once it has a reference to the connection manager (line 5 and 6) it checks that what has been returned is a RabbitMQConnectionManager (line 8 and 10). It then retrieves the queue name that was set on the source component and then calls the AcquireConnection on the connection manager itself. This will attempt to establish a connection to the RabbitMQ broker, which will return an IConnection. (For more details on this check out the first post.) It then stores the IConnection which was returned so it can be used later on in the component, as well as used in the ReleaseConnections method.

public override void AcquireConnections(object transaction)
{
  if (ComponentMetaData.RuntimeConnectionCollection[0].ConnectionManager != null)
  {
    ConnectionManager connectionManager = Microsoft.SqlServer.Dts.Runtime.DtsConvert.GetWrapper(
      ComponentMetaData.RuntimeConnectionCollection[0].ConnectionManager);

    this.rabbitMqConnectionManager = connectionManager.InnerObject as RabbitMQConnectionManager.RabbitMQConnectionManager;

    if (this.rabbitMqConnectionManager == null)
      throw new Exception("Couldn't get the RabbitMQ connection manager, ");

this.queueName = ComponentMetaData.CustomPropertyCollection["QueueName"].Value;
 rabbitConnection = this.rabbitMqConnectionManager.AcquireConnection(transaction) as IConnection;
 }
}

The next method to override is the ReleaseConnections, whose purpose it is to do any cleanup on any connections created by the AcquireConnections method. All I am doing below is checking that the rabbitMqConnectionManager has been set, then passing the rabbitConnection variable into it to be released, this is the IConnection which was returned in the AcquireConnections method.

public override void ReleaseConnections()
{
  if (rabbitMqConnectionManager != null)
  {
    this.rabbitMqConnectionManager.ReleaseConnection(rabbitConnection);
  }
}

Validating the Source

To ensure that the source has been correctly configured by the user, I have implementation the Validate method. All this does is check if the user has set the queue name. For more details on the Validate method see the MSDN documentation.

public override DTSValidationStatus Validate()
{
  bool cancel;
  string qName = ComponentMetaData.CustomPropertyCollection["QueueName"].Value;

  if (string.IsNullOrWhiteSpace(qName))
  {
    //Validate that the QueueName property is set
    ComponentMetaData.FireError(0, ComponentMetaData.Name, "The QueueName property must be set", "", 0, out cancel);
    return DTSValidationStatus.VS_ISBROKEN;
  }

  return base.Validate();
}

Getting messages out of RabbitMQ and into SSIS

Now that all the dull setup bits are out of the way, it is time for the interesting part!

There are a number of methods that are executed as part of the SSIS Pipeline, in this section I will be showing the PreExecute and PrimeOutput methods. The sequence of execution is:

  1. PrepareForExecute
  2. PreExecute
  3. PrimeOutput for a source and ProcessInput for a destination/transformation
  4. PostExecute
  5. Cleanup

During execution the PreExecute method is called once, therefore the MSDN documentation recommends that you place as much logic in here as possible. In the PreExecute method below, I am creating the channel and consumer that will be used in the PrimeOutput method to retrieve messages from the RabbitMQ queue.

public override void PreExecute()
{
  try
  {
    this.consumerChannel = rabbitConnection.CreateModel();
    this.consumerChannel.QueueDeclare(queueName, true, false, false, null);
    this.queueConsumer = new QueueingBasicConsumer(this.consumerChannel);
    this.consumerTag = consumerChannel.BasicConsume(queueName, true, queueConsumer);
  }
  catch (Exception)
  {
    ReleaseConnections();
    throw;
  }
}

The PrimeOutput method is called after PreExecute, it is in here that rows are added to the output buffer. The number of output buffers in the buffers array is determined by the IDTSOutputCollection100. In the ProvideComponentProperties method at the beginning of this post I am adding a single output to this collection.

In the PrimeOutput method below, you can see that the component will continually retrieve messages from the RabbitMQ queue until there are none left.

Then for each message that it retrieves it calls the AddRow method on the buffer, this causes a new row to be created, it then sets the message contents and the routing key values of the message on the row.

Finally, the SetEndOfRowset method is called to indicate to the source that the component has finished adding rows.

public override void PrimeOutput(int outputs, int[] outputIDs, PipelineBuffer[] buffers)
{
  IDTSOutput100 output = ComponentMetaData.OutputCollection[0];
  PipelineBuffer buffer = buffers[0];

  object message;
  bool success;

  while (queueConsumer.IsRunning)
  {
    try
    {
      success = queueConsumer.Queue.Dequeue(100, out message);
    }
    catch (Exception)
    {
      break;
    }

    if (success)
    {
      BasicDeliverEventArgs e = (BasicDeliverEventArgs)message;

      var messageContent = System.Text.Encoding.UTF8.GetString(e.Body);

      buffer.AddRow();
      buffer[0] = messageContent;
      buffer[1] = e.RoutingKey;
    }
    else
    {
      break;
    }
  }

  buffer.SetEndOfRowset();
}

That is enough to start getting messages out of RabbitMQ and into the SSIS package.

Cleaning Up

The last stage is to clean up after ourselves. In the Cleanup method below I am cancelling the consumer that was created in the PreExecute method and then closing the channel.

public override void Cleanup()
{
  if (consumerChannel.IsOpen)
  {
    if (queueConsumer.IsRunning)
    {
      consumerChannel.BasicCancel(consumerTag);
    }
    consumerChannel.Close();
  }
  base.Cleanup();
}

The ReleaseConnections method will be called by SSIS to release the connection to RabbitMQ.

What it looks like

It is now time to run the package.

Below you can see the data flow tasks highlighted in the middle of the screen, to the right is the QueueName property and down the bottom is the RabbitMQ Connection Manager.

ssis-rabbitmq-source-1

Below you can see that the RabbitMQ Source has dequeued 2 messages, which are displayed in the data viewer, it has a tick indicating that it has finished dequeuing messages (this is correct because I only put two messages into the queue).

ssis-rabbitmq-source-2

Up next I will be building a custom interface for configuring the RabbitMQ Source.

Advertisements

3 thoughts on “Part 3 – Building a Custom Source Component

  1. Nathan says:

    After building this step of the solution, I can’t seem to get the RabbitMQSource to appear in SSDT. Anything I should double check to see if I can resolve that? Thanks.

    • KenR says:

      Do you have the option to create a RabbitMQ Connection Manager?

      Two things I can think of:

      1. I had to put the RabbitMQ.Client library into the GAC, so you might need to do that as well (but it shouldn’t stop you seeing the component in SSDT)
      2. Is the platform target set to x86 in your project properties?

  2. Junoreactor says:

    Thank you for your excellent work but i have a problem.
    When i execute my package ( rabbitMq Source component linked to union all component) in the data flow nothing happens no error rises.
    Yet, the RabbitMQ component is correctly configured :
    The server’s connection is OK, the queue Name also ( works well in à windows form project )
    When i execute my package from the command line via DTExec.exe the error is :
    The object reference not set to an instance of an object to SSISRabbitMQSource.RabbitMqSource.PreExecute().
    If i use debugger.launch() into Preexecute méthode, it does not pass through the Preexecute method.
    But it does pass through the ProvideComponentProperties and Validate méthodes.
    Please help
    Thanks

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out / Change )

Twitter picture

You are commenting using your Twitter account. Log Out / Change )

Facebook photo

You are commenting using your Facebook account. Log Out / Change )

Google+ photo

You are commenting using your Google+ account. Log Out / Change )

Connecting to %s