Skip to main content

Biztalk : Pipeline Stamping Receive Message


This code allows you to stamp your incoming message with a XML element called SequenceNo. This sequence number gives you the ability to sequence without making changes to your existing application.  It is unique in the sense that it does not create a new part for you and there shouldn't be too much modification to your existing codes in order to implement it. 


Creating a pipeline in Biztalk Server is relatively easy. It requires you to

1) Implement specific Interface

2) Create a Biztalk Project that uses a pipeline Item type with extension Btp. Drag the assembly that you have created above into [validate] section.

3) Deploy and move it to the right application. You need to configure properly after this.


So first you have to come up with an assembly that overrides the Execute method as shown below. It returns a new IBaseMessage: 

 public IBaseMessage Execute(IPipelineContext pContext, IBaseMessage pInMsg)
        {
            int seq = 0;

            /// new message structure ///  
            IBaseMessageFactory msgFactory = pContext.GetMessageFactory(); 
            IBaseMessageContext context = pInMsg.Context;

            IBaseMessagePart OriginalBodyPart = pInMsg.BodyPart;
            Stream MessageStream = OriginalBodyPart.GetOriginalDataStream();
            
            //// Constructing a new message ////

            IBaseMessage msg = msgFactory.CreateMessage();


            
            IPipelineContextEx pcx = pContext as IPipelineContextEx;

            if (OriginalBodyPart != null)
            {
                
                
              #region [Debugging/Logging]
       //XmlDocument xDocOutput = new XmlDocument();
                //xDocOutput.Load(MessageStream);
                //XmlTextWriter writer = new XmlTextWriter("d:\\temp\\message.xml", System.Text.Encoding.UTF8);
                //writer.Formatting = Formatting.Indented;
                //xDocOutput.WriteTo(writer);
                //writer.Flush();
                //writer.Close(); 
       #endregion  
                
                if (pcx != null)
                {
                    ITransaction trans = null;
                    trans = (ITransaction)pcx.GetTransaction();

                    if (trans != null)
                    {
                        //// Get ConnectionString //// 

                        SqlConnection conn = new SqlConnection("Database=SequenceDB;Server=72.15.219.241;connect timeout=60;user id=es14703-dev;password=p0w3rOFdr3ams");
                        SqlCommand cmd = new SqlCommand();
                        cmd.CommandType = CommandType.StoredProcedure;
                        cmd.CommandText = "GetSequence";
                        
                        SqlParameter parameter = new SqlParameter("@ReceiveLocationName", SqlDbType.NVarChar, 50);
                        parameter.Value = "TestReceiveLocation"; 
                        cmd.Parameters.Add(parameter);


                        ///////////////////////////////////////////
                        /// System.IO.File.AppendAllText("d:\\temp\\mqdata.log", MessageStream.ToString());
                        /////////////////////////////////////////////

                        try
                        {
                            cmd.Connection = conn;
                            conn.Open(); 

                            //// Get the Data /// 
                            seq = Convert.ToInt32(cmd.ExecuteScalar());


                            IBaseMessagePart part = msgFactory.CreateMessagePart();

                            XmlDocument xdoc = new XmlDocument();
                            xdoc.Load(MessageStream);

                            XmlElement newElement = xdoc.CreateElement("SequenceNo");
                            newElement.InnerText = seq.ToString();
                            xdoc.LastChild.AppendChild(newElement);

                            byte[] dataBytes = System.Text.Encoding.UTF8.GetBytes(xdoc.InnerXml);
                            MemoryStream stream = new MemoryStream();
                            stream.Write(dataBytes, 0, dataBytes.Length);

                            stream.Seek(0, SeekOrigin.Begin);

                            part.Data = stream;
                            msg.Context = context;

                            msg.AddPart("DMQMessageContractSend", part, true);

                            
                            IBaseMessagePart ExtendedPart = msgFactory.CreateMessagePart();
                            byte[] extendedContent = System.Text.Encoding.UTF8.GetBytes(string.Format("{0}", seq.ToString()));
                            
                            ExtendedPart.Data = new MemoryStream(extendedContent);
                            ExtendedPart.Charset = "utf-8";
                            ExtendedPart.ContentType = "text/xml";
                            msg.AddPart("Sequence", ExtendedPart, false);


                            #region [OldCodes]

                            ///  Stamp a sequence number to the message /// 

                            //XmlDocument xdoc = new XmlDocument();
                            //xdoc.Load(MessageStream);

                            //XmlElement newElement = xdoc.CreateElement("SequenceNo");
                            //newElement.InnerText = seq.ToString();
                            //xdoc.LastChild.AppendChild(newElement);

                            //byte[] dataBytes = System.Text.Encoding.UTF8.GetBytes(xdoc.InnerXml);
                            //MemoryStream stream = new MemoryStream();

                            //stream.Write(dataBytes, 0, dataBytes.Length);

                            //System.IO.File.AppendAllText("d:\\temp\\msmqlog.log", "somedata gotta be here" +  System.Text.Encoding.UTF8.GetString(dataBytes));

                            //stream.Seek(0, SeekOrigin.Begin);
                            //bodyPart.Data.Seek(0, SeekOrigin.Begin);
                            //OriginalBodyPart.Data = stream;

                            //IBaseMessage outMsg;
                            //outMsg = pContext.GetMessageFactory().CreateMessage();
                            //outMsg.AddPart("Body", pContext.GetMessageFactory().CreateMessagePart(), true);
                            //outMsg.BodyPart.Data = stream;

                            //IBaseMessage outMsg;
                            //pInMsg = pContext.GetMessageFactory().CreateMessage();
                            //pInMsg.AddPart("Sequence", pContext.GetMessageFactory().CreateMessagePart(), true);
                            //pInMsg.BodyPart.Data = stream;

                            //bodyPart.Data.Write(dataBytes, 0, dataBytes.Length);

                            //byte[] outputDataBytes = new byte[bodyPart.Data.Length]; 
                            //int xlength  = System.Convert.ToInt32(bodyPart.Data.Length);
                            //bodyPart.Data.Read(outputDataBytes, 0, xlength);

                            //System.IO.File.AppendAllText("d:\\temp\\scanstream.log", "Output " + System.Text.Encoding.UTF8.GetString(outputDataBytes));

                            //// Increment and Update the data //// 

                            // context.Promote("messageData", "", seq.ToString());
                            // context.Write("messageData", "", seq.ToString());


                            #endregion                            

                            
                            SqlConnection SeqUpdateConn = new SqlConnection("Database=SequenceDB;Server=72.15.219.241;connect timeout=60;user id=es14703-dev;password=p0w3rOFdr3ams");

                            SqlCommand SeqUpdateCommand = new SqlCommand();
                            SeqUpdateCommand.CommandType = CommandType.StoredProcedure;
                            SeqUpdateCommand.CommandText = "UpdateSequence";

                            SqlParameter UpdateParameter = new SqlParameter("@ReceiveLocationName", SqlDbType.NVarChar, 50);
                            UpdateParameter.Value = "TestReceiveLocation";
                            SeqUpdateCommand.Parameters.Add(UpdateParameter);

                            SeqUpdateCommand.Connection = SeqUpdateConn;
                            SeqUpdateConn.Open();

                            SeqUpdateCommand.ExecuteNonQuery(); 


                            SeqUpdateCommand.Dispose();
                            SeqUpdateConn.Close();

                        }
                        catch (SqlException)
                        {
                            throw;
                        }
                        finally
                        {
                            cmd.Dispose();
                            conn.Close();
                        }

                    }
                }

            }


            /// Returning the newly constructed messeage part /// 

            return msg;


        }

Once you have this code, next is to create a new biztalk server project and use toolbox to Choose Item which can be added into your pipeline. 

In your orchestration,  reference can be made to this Sequence Number using the following codes:-

varStrSequenceNo = xpath(DMQMessage, "string(/*[local-name()='DMQMessageContractSend' and namespace-uri()='']/*[local-name()='SequenceNo' and namespace-uri()=''])");








Comments

Popular posts from this blog

Android Programmatically apply style to your view

Applying style to your view (button in this case) dynamically is pretty easy. All you have to do is place the following in your layout folder (res/layout)
Let's call this file : buttonstyle.xml
<?xml version="1.0" encoding="utf-8"?> <selector xmlns:android="http://schemas.android.com/apk/res/android"> <item android:state_pressed="true" > <shape> <solid android:color="#449def" /> <stroke android:width="1dp" android:color="#2f6699" /> <corners android:radius="3dp" /> <padding android:left="10dp" android:top="10dp" android:right="10dp" android:bottom="10dp" /> </shape> </item> <item> <shape> <gradient android:startColor="#449def" a…

DataTable does not have AsEnumerable

I have problem locating my AsEnumerable extension method in my DataTabe (System.Data). Thank god for this post by Angel
(http://blogs.msdn.com/angelsb/archive/2007/02/23/does-not-contain-a-definition-for.aspx)

I was able to find this method once i have added reference to the following assembly.

C:\Program Files\Reference Assemblies\Microsoft\Framework\v3.5\System.Data.DataSetExtensions.dll

Try to do a dummy Build and you should be able to get it.

OpenCover code coverage for .Net Core

I know there are many post out there getting code coverage for .dotnetcore. I'm using opencover to address this needs.

In case, you do no want to use opencover and wanted to stick with vs2015 code coverage, you can try to copy Microsoft.VisualStudio.CodeCoverage.Shim.dll from C:\Program Files (x86)\Microsoft Visual Studio 14.0\Team Tools\Dynamic Code Coverage Tools\coreclr\ and drop it into your project "bin\Debug\netcoreapp1.0" folder.  Please note : you need to be on VS2015 Enterprise to do this. 

To get started, I guess we need to add OpenCover and ReportGenerator for our test projects, as shown in diagram below :-



When nuget packge gets restored, we will have some binaries downloaded to our machine and we going to use this to generate some statistics. I think the biggest issue is to getting those command lines work.

In dotnetcore, we run test project using "dotnet test" (assuming you are in the test project folder - if not please go there)  So we add this …