For one of the interfaces I was working on we get a flat file with information for multiple identifiers. We needed to split this file so that we only have 1 identifier per file. For this we create a new disassemble pipeline component.
We will add 2 attributes to the class. The first is StartIndex, an integer which will define the location of a unique identifier in the file, for example a identifiercode. The second is Length, an integer that indicates how long the identifier is.
Also add a Queue to the class, which will hold the new messages.
/// <summary>
/// Split an incoming message.
/// </summary>
public class FileSplitter : IBaseComponent,
IDisassemblerComponent,
IComponentUI,
IPersistPropertyBag
{
/// <summary>
/// Used to hold disassembled messages.
/// </summary>
private System.Collections.Queue qOutputMsgs =
new System.Collections.Queue();
/// <summary>
/// Namespace used to set the promoted properties.
/// </summary>
private string systemPropertiesNamespace =
@"http://schemas.microsoft.com/BizTalk/2003/system-properties";
/// <summary>
/// The start index of the identifier.
/// </summary>
private int _startIndex;
/// <summary>
/// The start index of the identifier.
/// </summary>
public int StartIndex
{
get
{
return _startIndex;
}
set
{
_startIndex = value;
}
}
/// <summary>
/// The length of the identifier.
/// </summary>
private int _length;
/// <summary>
/// The length of the identifier.
/// </summary>
public int Length
{
get
{
return _length;
}
set
{
_length = value;
}
}
}
|
Now we will create the disassemble method, this is called in the disassemble stage of the pipeline. Here we will read in the message line by line, and check in each line if the identifier in it is the same as the identifier in the previous line. If it is the same, we add it to the current message, if not, we create a new message.
/// <summary>
/// Disassemble method is used to initiate the disassembling of the
/// message in this pipeline component.
/// </summary>
/// <param name="pc">Pipeline context.</param>
/// <param name="inmsg">Input message.</param>
public void Disassemble(IPipelineContext pContext, IBaseMessage pInMsg)
{
// The namespace in which the new messages should be
string namespaceURI =
"http://www.company.com/BizTalk/Application/v1";
// The root element for the new messages
string rootElement = "File_Incoming";
// Stringbuilder used to create the new message
StringBuilder messageString = new StringBuilder();
// Stream that will hold the original message's data
Stream originalMessageStream;
// Get the original file name
string srcFileName = pInMsg.Context.Read("ReceivedFileName",
"http://schemas.microsoft.com/BizTalk/2003/"
+ "file-properties").ToString().Replace(".txt", "");
// Counter to make the outgoing filename unique
int count = 0;
try
{
// Fetch original message's data
originalMessageStream = pInMsg.BodyPart.GetOriginalDataStream();
}
catch (Exception ex)
{
// Something went wrong
throw new ApplicationException("Error in reading original
message: " + ex.Message);
}
try
{
// Create a StreamReader to read the original message's data
StreamReader sr = new StreamReader(originalMessageStream);
// The identifier for the last line
string curIdentifier = string.Empty;
// Go through all the lines in the original message
while (sr.Peek() >= 0)
{
// Read the next line
string line = sr.ReadLine();
// Get the identifier in this line
string identifier = line.Substring(_startIndex, _length);
// Check if this is the same identifier as in the previous
// line
if (!identifier.Equals(curIdentifier))
{
// If not, close current identifier if any
// This prevents an empty message to be created the
// first time we come here
if (!string.IsNullOrEmpty(curIdentifier))
{
// Queue the message
CreateOutgoingMessage(pContext,
messageString.ToString(), namespaceURI,
rootElement,
String.Format("{0}_{1}", srcFileName, count));
// Clear the message for the next message
messageString.Remove(0, messageString.Length);
// Next message will be in a unique file
count++;
}
// From now on we want to compare to this identifier
curIdentifier = identifier;
}
// Add the line to the current message
messageString.Append(line + Environment.NewLine);
}
// Close current identifier if any
if (!string.IsNullOrEmpty(curIdentifier))
{
// Queue the message
CreateOutgoingMessage(pContext, messageString.ToString(),
namespaceURI, rootElement,
String.Format("{0}_{1}", srcFileName, count));
}
// Close the StreamReader
sr.Close();
}
catch (Exception ex)
{
// Something went wrong
throw new ApplicationException("Error in writing outgoing "
+ "messages: " + ex.Message);
}
finally
{
// Close the StringBuilder
messageString = null;
}
}
|
Now we will implement the GetNext method. This method is used in the pipeline to pass the messages to the next stage.
/// <summary>
/// Used to pass output messages to next stage.
/// </summary>
public IBaseMessage GetNext(IPipelineContext pContext)
{
// Check if there any messages in the queue
if (qOutputMsgs.Count > 0)
{
// Get the next message
return (IBaseMessage)qOutputMsgs.Dequeue();
}
return null;
}
Finally we have to create the method that puts the messages in the queue.
/// <summary>
/// Queue outgoing messages.
/// </summary>
/// <param name="pContext">Pipeline context.</param>
/// <param name="messageString">The string with the new (debatched) message.</param>
/// <param name="namespaceURI">The namespace we want to use for the message.</param>
/// <param name="rootElement">The root element for the message.</param>
/// <param name="sourceFileName">The file name we want to use for the new
message.</param>
private void CreateOutgoingMessage(IPipelineContext pContext,
String messageString, string namespaceURI, string rootElement,
string sourceFileName)
{
// The message to be put in the queue for further processing in
// the pipeline
IBaseMessage outMsg;
try
{
// Create outgoing message
outMsg = pContext.GetMessageFactory().CreateMessage();
// Add the body part
outMsg.AddPart("Body",
pContext.GetMessageFactory().CreateMessagePart(), true);
// Add the namespace and root element
outMsg.Context.Promote("MessageType",
systemPropertiesNamespace, namespaceURI + "#"
+ rootElement.Replace("ns0:", ""));
// Set the filename to be used, this can be used in BizTalk
// by using the %SourceFileName% identifier
outMsg.Context.Promote("ReceivedFileName",
"http://schemas.microsoft.com/BizTalk/2003/file-properties",
sourceFileName);
// Get the outgoing message as bytes
byte[] bufferOutgoingMessage =
System.Text.ASCIIEncoding.ASCII.GetBytes(messageString);
// Set the data of the outgoing message
outMsg.BodyPart.Data = new MemoryStream(bufferOutgoingMessage);
// Place the message in the queue
qOutputMsgs.Enqueue(outMsg);
}
catch (Exception ex)
{
// Something went wrong
throw new ApplicationException("Error in queueing outgoing "
+ "messages: " + ex.Message);
}
}
|
And finally, the whole class.
using System;
using System.IO;
using System.Text;
using System.ComponentModel;
using Microsoft.BizTalk.Component.Interop;
using Microsoft.BizTalk.Message.Interop;
namespace Company.BizTalk.Other.PipelineComponents
{
[ComponentCategory(CategoryTypes.CATID_PipelineComponent)]
[ComponentCategory(CategoryTypes.CATID_DisassemblingParser)]
[System.Runtime.InteropServices.Guid("F0DF352C-657B-42AD-A26D-730E8031CD42")]
/// <summary>
/// Split an incoming message.
/// </summary>
public class FileSplitter : IBaseComponent,
IDisassemblerComponent,
IComponentUI,
IPersistPropertyBag
{
/// <summary>
/// Used to hold disassembled messages.
/// </summary>
private System.Collections.Queue qOutputMsgs =
new System.Collections.Queue();
/// <summary>
/// Namespace used to set the promoted properties.
/// </summary>
private string systemPropertiesNamespace =
@"http://schemas.microsoft.com/BizTalk/2003/system-properties";
/// <summary>
/// The start index of the identifier.
/// </summary>
private int _startIndex;
/// <summary>
/// The start index of the identifier.
/// </summary>
public int StartIndex
{
get
{
return _startIndex;
}
set
{
_startIndex = value;
}
}
/// <summary>
/// The length of the identifier.
/// </summary>
private int _length;
/// <summary>
/// The length of the identifier.
/// </summary>
public int Length
{
get
{
return _length;
}
set
{
_length = value;
}
}
}
/// <summary>
/// Default constructor
/// </summary>
public FileSplitter()
{
}
/// <summary>
/// Description of pipeline
/// </summary>
public string Description
{
get
{
return "Component to batch one flat file message into multiple
messages";
}
}
/// <summary>
/// Name of pipeline
/// </summary>
public string Name
{
get
{
return "FileSplitter";
}
}
/// <summary>
/// Pipeline version
/// </summary>
public string Version
{
get
{
return "1.0.0.0";
}
}
/// <summary>
/// Returns collecton of errors
/// </summary>
public System.Collections.IEnumerator Validate(object projectSystem)
{
return null;
}
/// <summary>
/// Returns icon of pipeline
/// </summary>
public System.IntPtr Icon
{
get
{
return new System.IntPtr();
}
}
/// <summary>
/// Class GUID
/// </summary>
public void GetClassID(out Guid classID)
{
classID = new Guid("F0DF352C-657B-42AD-A26D-730E8031CD42");
}
/// <summary>
/// InitNew
/// </summary>
public void InitNew()
{
}
/// <summary>
/// Load property from property bag
/// </summary>
public void Load(IPropertyBag propertyBag, int errorLog)
{
object val = null;
try
{
propertyBag.Read("StartIndex", out val, 0);
}
catch (ArgumentException)
{
val = 0;
}
catch (NullReferenceException)
{
val = 0;
}
catch (Exception ex)
{
throw new ApplicationException(ex.Message);
}
if (val != null)
{
this._startIndex = ((int)(val));
}
val = null;
try
{
propertyBag.Read("Length", out val, 0);
}
catch (ArgumentException)
{
val = 0;
}
catch (NullReferenceException)
{
val = 0;
}
catch (Exception ex)
{
throw new ApplicationException(ex.Message);
}
if (val != null)
{
this._length = ((int)(val));
}
}
/// <summary>
/// Write property to property bag
/// </summary>
public void Save(IPropertyBag propertyBag, bool clearDirty,
bool saveAllProperties)
{
object val = (object)StartIndex;
propertyBag.Write("StartIndex", ref val);
val = (object)Length;
propertyBag.Write("Length", ref val);
}
/// <summary>
/// Disassemble method is used to initiate the disassembling of the
/// message in this pipeline component.
/// </summary>
/// <param name="pc">Pipeline context.</param>
/// <param name="inmsg">Input message.</param>
public void Disassemble(IPipelineContext pContext, IBaseMessage pInMsg)
{
// The namespace in which the new messages should be
string namespaceURI =
"http://www.company.com/BizTalk/Application/v1";
// The root element for the new messages
string rootElement = "File_Incoming";
// Stringbuilder used to create the new message
StringBuilder messageString = new StringBuilder();
// Stream that will hold the original message's data
Stream originalMessageStream;
// Get the original file name
string srcFileName = pInMsg.Context.Read("ReceivedFileName",
"http://schemas.microsoft.com/BizTalk/2003/"
+ "file-properties").ToString().Replace(".txt", "");
// Counter to make the outgoing filename unique
int count = 0;
try
{
// Fetch original message's data
originalMessageStream = pInMsg.BodyPart.GetOriginalDataStream();
}
catch (Exception ex)
{
// Something went wrong
throw new ApplicationException("Error in reading original
message: " + ex.Message);
}
try
{
// Create a StreamReader to read the original message's data
StreamReader sr = new StreamReader(originalMessageStream);
// The identifier for the last line
string curIdentifier = string.Empty;
// Go through all the lines in the original message
while (sr.Peek() >= 0)
{
// Read the next line
string line = sr.ReadLine();
// Get the identifier in this line
string identifier = line.Substring(_startIndex, _length);
// Check if this is the same identifier as in the previous
// line
if (!identifier.Equals(curIdentifier))
{
// If not, close current identifier if any
// This prevents an empty message to be created the
// first time we come here
if (!string.IsNullOrEmpty(curIdentifier))
{
// Queue the message
CreateOutgoingMessage(pContext,
messageString.ToString(), namespaceURI,
rootElement,
String.Format("{0}_{1}", srcFileName, count));
// Clear the message for the next message
messageString.Remove(0, messageString.Length);
// Next message will be in a unique file
count++;
}
// From now on we want to compare to this identifier
curIdentifier = identifier;
}
// Add the line to the current message
messageString.Append(line + Environment.NewLine);
}
// Close current identifier if any
if (!string.IsNullOrEmpty(curIdentifier))
{
// Queue the message
CreateOutgoingMessage(pContext, messageString.ToString(),
namespaceURI, rootElement,
String.Format("{0}_{1}", srcFileName, count));
}
// Close the StreamReader
sr.Close();
}
catch (Exception ex)
{
// Something went wrong
throw new ApplicationException("Error in writing outgoing "
+ "messages: " + ex.Message);
}
finally
{
// Close the StringBuilder
messageString = null;
}
}
/// <summary>
/// Used to pass output messages to next stage.
/// </summary>
public IBaseMessage GetNext(IPipelineContext pContext)
{
// Check if there any messages in the queue
if (qOutputMsgs.Count > 0)
{
// Get the next message
return (IBaseMessage)qOutputMsgs.Dequeue();
}
return null;
}
/// <summary>
/// Queue outgoing messages.
/// </summary>
/// <param name="pContext">Pipeline context.</param>
/// <param name="messageString">The string with the new (debatched)
/// message.</param>
/// <param name="namespaceURI">The namespace we want to use for the
/// message.</param>
/// <param name="rootElement">The root element for the message.</param>
/// <param name="sourceFileName">The file name we want to use for the new
/// message.</param>
private void CreateOutgoingMessage(IPipelineContext pContext,
String messageString, string namespaceURI, string rootElement,
string sourceFileName)
{
// The message to be put in the queue for further processing in
// the pipeline
IBaseMessage outMsg;
try
{
// Create outgoing message
outMsg = pContext.GetMessageFactory().CreateMessage();
// Add the body part
outMsg.AddPart("Body",
pContext.GetMessageFactory().CreateMessagePart(), true);
// Add the namespace and root element
outMsg.Context.Promote("MessageType",
systemPropertiesNamespace, namespaceURI + "#"
+ rootElement.Replace("ns0:", ""));
// Set the filename to be used, this can be used in BizTalk
// by using the %SourceFileName% identifier
outMsg.Context.Promote("ReceivedFileName",
"http://schemas.microsoft.com/BizTalk/2003/file-properties",
sourceFileName);
// Get the outgoing message as bytes
byte[] bufferOutgoingMessage =
System.Text.ASCIIEncoding.ASCII.GetBytes(messageString);
// Set the data of the outgoing message
outMsg.BodyPart.Data = new MemoryStream(bufferOutgoingMessage);
// Place the message in the queue
qOutputMsgs.Enqueue(outMsg);
}
catch (Exception ex)
{
// Something went wrong
throw new ApplicationException("Error in queueing outgoing "
+ "messages: " + ex.Message);
}
}
}
}
|
Geen opmerkingen:
Een reactie posten