XML Refinery BizTalk Pipeline Component

Simple BizTalk pipeline component to cleanse outbound messages.  Basically end game is for testers and administrators to be able to define fields to be removed from outbound messages without having to go back to the development team to have the maps changed. With different send ports for different consumers we can also filter down the data for specific consumers.

The code below simply uses the power of XPATH (what else would we use) to target a node set within our XML to remove.  Using XPATH you can remove nodes from repeating groups, remove just the text node, remove the entire node, remove nodes only with a certain value etc etc.  Basically use any XPATH that will result in a node set and that node set will be removed.

I’ve stored the XPATH statements in a config file for the sake of portability of the sample.  The XPATHs are cached in memory for performance.  Note the demo version of the code is using XMLDocument which may encountered memory consumption issues with large messages.  User be warned, you have to source code so upgrade at will…


XMLRefinery.cs


using System;
using System.IO;
using System.Xml;
using System.Linq;
using System.Resources;
using System.Reflection;
using System.Collections;
using System.Diagnostics;
using System.Globalization;
using System.ComponentModel;
using System.Collections.Generic;
using System.Runtime.InteropServices;
using Microsoft.BizTalk.Message.Interop;
using Microsoft.BizTalk.Component.Interop;

namespace Integr8.BizTalk.PipelineComponents
{
    /// <summary>
    /// Funcitonality to remove selected nodes from XML fomatted IBaseMessage
    /// </summary>
    [ComponentCategory(CategoryTypes.CATID_PipelineComponent)]
    [System.Runtime.InteropServices.Guid("a46c154c-6ae6-ccc2-9c16-bf8900eb3435")]
    [ComponentCategory(CategoryTypes.CATID_Any)]
    public class XMLRefinery : Microsoft.BizTalk.Component.Interop.IComponent, IBaseComponent, IPersistPropertyBag, IComponentUI
    {
        #region Constants

        #endregion

        #region Variables

        private bool enabled = false;
        private string key = string.Empty;
        private string definitionFile = string.Empty;
        private string cacheExpirySeconds = string.Empty; 
        private static System.Runtime.Caching.ObjectCache cache;

        #endregion

        #region Pipeline Properties

        [Description("If set to false the component will bypass processing.")]
        [DisplayName("Enabled")]
        public bool Enabled
        {
            get { return enabled; }
            set { enabled = value; }
        }

        [Description("The key for the set of XPATH expressions used to identify nodes to be removed at runtime.")]
        [DisplayName("Key")]
        public string Key
        {
            get { return key; }
            set { key = value; }
        }

        [Description("The path to the definition file that defines XPATHs to be used to remove nodes.")]
        [DisplayName("Definition File")]
        public string DefinitionFile
        {
            get { return definitionFile; }
            set { definitionFile = value; }
        }

        [Description("The number of seconds cached definition file values will expire and require refresh from file.")]
        [DisplayName("Cache Expiry Seconds")]
        public string CacheExpirySeconds
        {
            get { return cacheExpirySeconds; }
            set { cacheExpirySeconds = value; }
        }

        #endregion

        #region Public Methods

        public IBaseMessage Execute(IPipelineContext pContext, IBaseMessage pInMsg)
        {
            if (!Enabled)
                return pInMsg;

            #region Test Load the IBaseMessage to verify XML

            XmlDocument message = new XmlDocument();

            try
            {
                message.LoadXml(Helper.GetMessageBody(pInMsg));
            }
            catch (Exception xmlLoadException)
            {
                throw new Exception("The XML Refinery pipeline component only supports XML formatted IBaseMessage.  The message encountered was not valid XML.", xmlLoadException);
            }

            #endregion 

            #region Refine XML

            foreach (KeyValuePair<string,string> definition in GetDefinitions())
            {
                RemoveNode(message, definition.Value);
            }

            #endregion

            #region Reset message stream

            Stream memoryStream = new MemoryStream();
            message.Save(memoryStream);

            if (memoryStream.Position > 0)
                memoryStream.Seek(0, SeekOrigin.Begin);

            pInMsg.BodyPart.Data = memoryStream;
            pContext.ResourceTracker.AddResource(memoryStream);

            if (pInMsg.BodyPart.Data.Position > 0)
                pInMsg.BodyPart.Data.Seek(0, SeekOrigin.Begin);

            #endregion

            return pInMsg;
        }

        public void GetClassID(out Guid classID)
        {
            classID = new Guid("a46c154c-6ae6-ccc2-9c16-bf8900eb3435");
        }

        public void InitNew()
        {

        }

        IEnumerator IComponentUI.Validate(object projectSystem)
        {
            return new ArrayList(0).GetEnumerator();
        }

        public void Load(IPropertyBag propertyBag, int errorLog)
        {
            object val = Helper.ReadPropertyBag(propertyBag, "Enabled");

            if (val != null)
                enabled = (bool)val;

            object val1 = Helper.ReadPropertyBag(propertyBag, "Key");

            if (val1 != null)
                key = (string)val1;

            object val2 = Helper.ReadPropertyBag(propertyBag, "DefinitionFile");

            if (val2 != null)
                definitionFile = (string)val2;

            object val3 = Helper.ReadPropertyBag(propertyBag, "CacheExpirySeconds");

            if (val3 != null)
                cacheExpirySeconds = (string)val3;
        }

        public void Save(IPropertyBag propertyBag, bool clearDirty, bool saveAllProperties)
        {
            Helper.WritePropertyBag(propertyBag, "Enabled", (object)enabled);
            Helper.WritePropertyBag(propertyBag, "Key", (object)key);
            Helper.WritePropertyBag(propertyBag, "CacheExpirySeconds", (object)cacheExpirySeconds);
            Helper.WritePropertyBag(propertyBag, "DefinitionFile", (object)definitionFile);
        }

        #endregion

        #region Private Methods

        private static void RemoveNode(XmlDocument message, string xpath)
        {
            XmlNodeList nodesToDelete = message.SelectNodes(xpath);

            foreach (System.Xml.XmlNode nodeToDelete in nodesToDelete)
                nodeToDelete.ParentNode.RemoveChild(nodeToDelete);
        }

        private Dictionary<string, string> GetDefinitions()
        {
            Dictionary<string,string> definitions = null;

            if (cache == null)
                cache = System.Runtime.Caching.MemoryCache.Default;

            try
            {
                definitions = (Dictionary<string, string>)cache.GetCacheItem(key, null).Value;
            }
            catch(Exception ex)
            {
                System.Diagnostics.Trace.Write("In memory cache defintion empty, refreshing cache. : " + ex.ToString());
            }

            if (definitions == null)
            {
                XmlDocument definitionXml = new XmlDocument();
                definitionXml.Load(definitionFile);
                definitions = new Dictionary<string, string>();

                foreach (XmlNode node in definitionXml.SelectSingleNode("//Definitions").ChildNodes)
                {
                    definitions.Add(node.Attributes["key"].Value, node.Attributes["value"].Value);
                }

                System.Runtime.Caching.CacheItemPolicy policy = new System.Runtime.Caching.CacheItemPolicy();

                try
                {
                    policy.SlidingExpiration = new TimeSpan(0, 0, System.Convert.ToInt32(cacheExpirySeconds));
                }
                catch (Exception ex)
                {
                    System.Diagnostics.Trace.Write("CacheExpirySeconds property is not a valid integer.  Defaulting cache expriy to 60 seconds. : " + ex.ToString());
                    policy.SlidingExpiration = new TimeSpan(0, 0, 60);
                }

                // Refresh the cache
                cache.Set(key, definitions, policy);
            }

            return definitions;
        }

        #endregion

        #region IComponentUI Properties

        [Browsable(false)]
        public string Description
        {
            get
            {
                return "XMLRefinery";
            }
        }

        [Browsable(false)]
        IntPtr IComponentUI.Icon
        {
            get
            {
                return IntPtr.Zero;
            }
        }

        [Browsable(false)]
        public string Name
        {
            get
            {
                return "XML Refinery";
            }
        }

        [Browsable(false)]
        public string Version
        {
            get
            {
                return Assembly.GetExecutingAssembly().GetName().Version.ToString(2);
            }
        }

        #endregion
    }
}

Helper.cs

using System;
using System.IO;
using System.Linq;
using System.Resources;
using System.Reflection;
using System.Collections;
using System.Diagnostics;
using System.Globalization;
using System.ComponentModel;
using System.Collections.Generic;
using Microsoft.BizTalk.Streaming;
using System.Runtime.InteropServices;
using Microsoft.BizTalk.Message.Interop;
using Microsoft.BizTalk.Component.Interop;

namespace Integr8.BizTalk.PipelineComponents
{
    /// <summary>
    /// Helper class for reusable functionality
    /// </summary>
    [Serializable]
    internal class Helper
    {
        #region Internal Methods

        internal static object ReadPropertyBag(IPropertyBag propertyBag, string propName)
        {
            object ptrVar = null;
            try
            {
                propertyBag.Read(propName, out ptrVar, 0);
            }
            catch (ArgumentException)
            {
                return ptrVar;
            }
            catch (System.Exception exception)
            {
                throw new System.Exception(exception.ToString());
            }
            return ptrVar;
        }

        internal static void WritePropertyBag(IPropertyBag propertyBag, string propName, object val)
        {
            try
            {
                propertyBag.Write(propName, ref val);
            }
            catch (System.Exception exception)
            {
                throw new System.Exception(exception.ToString());
            }
        }

        internal static string GetMessageBody(IBaseMessage pInMsg)
        {
            System.IO.Stream sourceStream = pInMsg.BodyPart.GetOriginalDataStream();
            string body = string.Empty;

            if (!sourceStream.CanSeek)
            {
                ReadOnlySeekableStream seekableStream = new ReadOnlySeekableStream(sourceStream);
                pInMsg.BodyPart.Data = seekableStream;
                sourceStream = pInMsg.BodyPart.Data;
            }
            if (pInMsg.BodyPart.Data != null)
            {
                sourceStream.Seek(0, System.IO.SeekOrigin.Begin);

                System.IO.StreamReader reader = new System.IO.StreamReader(sourceStream);
                body = reader.ReadToEnd();
            }
            else
                throw new System.Exception("Cannot retrieve message body, body of message is empty");

            return body;
        }

        internal static string GetContextProperty(IBaseMessage pInMsg, string propertyName, string propertyNamespace)
        {
            if (pInMsg.Context.Read(propertyName, propertyNamespace) != null)
            {
                return pInMsg.Context.Read(propertyName, propertyNamespace).ToString();
            }
            else
            {
                return string.Empty;
            }
        }

        internal static IBaseMessage CreateIBaseMessage(string xml, IPipelineContext pContext)
        {
            Stream recordStream = new MemoryStream();
            StreamWriter sw = new StreamWriter(recordStream);
            sw.Write(xml);
            sw.Flush();

            IBaseMessage message = pContext.GetMessageFactory().CreateMessage();
            message.AddPart("Body", pContext.GetMessageFactory().CreateMessagePart(), true);

            if (recordStream.Position > 0)
                recordStream.Seek(0, System.IO.SeekOrigin.Begin);

            message.BodyPart.Data = recordStream;

            if (message.BodyPart.Data.Position > 0)
                message.BodyPart.Data.Seek(0, System.IO.SeekOrigin.Begin);

            return message;
        }

        #endregion
    }
}

Sample XML Definition File

<?xml version="1.0" encoding="utf-8" ?> 
<Definitions>
    <add key="1" value="//Order/Supplier/text()" />
    <add key="2" value="//Order[@quantity>='1000']" />
</Definitions>
Advertisements
This entry was posted in BizTalk Server. Bookmark the permalink.

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out / Change )

Twitter picture

You are commenting using your Twitter account. Log Out / Change )

Facebook photo

You are commenting using your Facebook account. Log Out / Change )

Google+ photo

You are commenting using your Google+ account. Log Out / Change )

Connecting to %s