Zulfiqar's weblog

Middleware, security & random .Net

Archive for the ‘ServiceBusV2’ Category

Pub/Sub with WCF (Part 2)

Posted by zamd on May 25, 2011

Source Code Download

Service Bus May CTP has a small glitch when it comes to pub/sub messaging using the WCF programing model. The May CTP API out-of-box doesn’t pick up filter/promoted properties from the WCF data contracts and requires you to explicitly specify these properties on the BrokeredMessage object outside of core WCF programing model as shown in part 1.

I didn’t like this repetition and decided to prototype a solution using the WCF extensibility model and after few hours of coding created a solution which looks quite cool :)

In my solution a DataMember can be marked with [PromotedProperty] attribute and a custom operation behavior picks these annotations and promote them as filter properties by automatically attaching them with the outgoing message.

    public class Order
    {
public double Amount { get; set
; }
[PromotedProperty]
        public string ShipCity { get; set
; }
}

[ServiceContract]
public interface IOrderService
    {
[
OperationContract(Name = "SubmitFlat", IsOneWay = true
)]
[PropertyPromotionBehavior]
        void Submit(double amount, [PromotedProperty] string
shipCity);

[OperationContract(IsOneWay = true)]
[
PropertyPromotionBehavior
]
void Submit(Order order);
}

I have decided to use a custom formatter to implement property lifting and injection functionality primarily because at the formatter level I still have a fairly typed view of the method call. At message inspector level most of typed-ness has gone and it would have required more work.

The [PropertyPromotionBehavior] creates a ‘promotion model’ (list of properties needs to be promoted) by reflecting on the data contract. The ‘promotion model’ is then populated by the custom formatter with actual parameter values extracted from the call context. [PropertyPromotionBehavior] also replaces the default formatter with a custom PromotionFormatter which wraps the default formatter and does the additional work of property promotion. I have highlighted the relevant bits below.

    [AttributeUsage(AttributeTargets.Method)]

    publicclassPropertyPromotionBehaviorAttribute : Attribute, IOperationBehavior

    {

        publicvoid Validate(OperationDescription operationDescription) { }

 

        publicvoid ApplyDispatchBehavior(OperationDescription operationDescription, DispatchOperation dispatchOperation) { }

 

        publicvoid ApplyClientBehavior(OperationDescription operationDescription, ClientOperation clientOperation)

        {

            var promotedProperties = LoadPromotedProperties(operationDescription);

            if (promotedProperties.Count <= 0) return;

 

            var dummy = newClientOperation(clientOperation.Parent, “dummy”, “urn:dummy”);

            var behavior =

                operationDescription.Behaviors.Find<DataContractSerializerOperationBehavior>() asIOperationBehavior;

            behavior.ApplyClientBehavior(operationDescription, dummy);

 

            clientOperation.Formatter = newPromotionFormatter(dummy.Formatter, promotedProperties);

            clientOperation.SerializeRequest = dummy.SerializeRequest;

        }

 

        publicvoid AddBindingParameters(OperationDescription operationDescription, BindingParameterCollection bindingParameters) { }

 

       …

    }

    classPromotionFormatter : IClientMessageFormatter

    {

        privatereadonlyIClientMessageFormatter _orignalFormatter;

        privatereadonlyIList<PromotedPropertyDescription> _promotions;

 

        public PromotionFormatter(IClientMessageFormatter orignalFormatter, IList<PromotedPropertyDescription> promotions)

        {

            _orignalFormatter = orignalFormatter;

            _promotions = promotions;

        }

 

        publicobject DeserializeReply(Message message, object[] parameters)

        {

            return _orignalFormatter.DeserializeReply(message, parameters);

        }

 

        publicMessage SerializeRequest(MessageVersion messageVersion, object[] parameters)

        {

            var message = _orignalFormatter.SerializeRequest(messageVersion, parameters);

 

 

            if (_promotions.Count > 0)

            {

                var bmp = newBrokeredMessageProperty();

                foreach (var promotion in _promotions)

                {

                    bmp.Properties[promotion.Name] = promotion.Value;

                }

                message.Properties[BrokeredMessageProperty.Name] = bmp;

            }

                                                              

            return message;

        }

    }

With these extensions in place, the code looks just like the normal WCF code and property promotion happens in the background as you would expect.

var cf = new ChannelFactory<IOrderService>(serviceBusBinding, topicAddress); 
var proxy = cf.CreateChannel();

proxy.Submit(new Order { Amount = 200, ShipCity = “london” });

proxy.Submit(new Order { Amount = 322, ShipCity = “reading” });

proxy.Submit(101, “reading”);

Console.WriteLine(“Submitted.”);

Hopefully Service Bus programing model will support this kind of behavior soon but for the time being these extensions would probably fill the gap.

I have attached complete source code with this post so please feel free to download and use.

Posted in ServiceBusV2 | 2 Comments »

Pub/Sub with WCF (Part 1)

Posted by zamd on May 19, 2011

Code download

In yesterday’s post I have explored how to use Service Bus queues as a transport to communicate between a WCF client and a service. In today’s post I will show you WCF pub/sub messaging using the topics and subscriptions. A subscription behaves exactly like a queue for reads while a topic behaves exactly like a queue for writes. This metaphor maps nicely to WCF where our services would listen on different subscriptions while the clients would send messages to a single topic. Service Bus would automatically forward matching messages to correct services.

For this example, I have created a simple order service as shown below. The Write extension simply writes to the console using a certain color assigned to a particular host. I have used this to distinguish the service host receiving the messages.

public class Order
{
    public double Amount { get; set; }
    public string ShipCity { get; set; }
}

[ServiceContract]
public interface IOrderService
{
    [OperationContract(IsOneWay = true)]
    void Submit(Order order);
}

public class OrderService : IOrderService
{
    public void Submit(Order order)
    {
        var writer = OperationContext.Current.Host.Extensions.Find<Writer>();

        writer.WriteLine("Received order value = {0}, ShipCity = {1}", order.Amount, order.ShipCity);
    }
}

Next I use the Management API to create topics & related subscriptions.

var baseAddress = "sb://soundbyte.servicebus.appfabriclabs.com";
var credential = TransportClientCredentialBase.CreateSharedSecretCredential("owner", "zYDbQ2wM4343dbBukWJTF6Y=");
var namespaceClient = new ServiceBusNamespaceClient(baseAddress, credential);

try
{
    namespaceClient.DeleteTopic("orders");
}
catch { }

var topic= namespaceClient.CreateTopic("orders");
topic.AddSubscription("london", new SqlFilterExpression("ShipCity = 'london'"));
topic.AddSubscription("reading", new SqlFilterExpression("ShipCity = 'reading'"));

Both subscriptions has a filter applied to them and only the messages matching this filter would be delivered to the subscription. I then create two separate service hosts simulating separate services running in different cities. I assigned ‘Cyan’ color to the London host (simulated service hosted in London) and ‘Yellow’ to the Reading Host (Service hosted in Reading).

var serviceBusBinding = new ServiceBusMessagingBinding();
serviceBusBinding.MessagingFactorySettings.Credential = credential;

string topicAddress = baseAddress + "/orders";
string londonSubscription = baseAddress + "/orders/subscriptions/london";
string readingSubscription = baseAddress + "/orders/subscriptions/reading";

var hostLondon = new ServiceHost(typeof(OrderService));
hostLondon.Extensions.Add(new Writer(ConsoleColor.Cyan));
hostLondon.AddServiceEndpoint(typeof(IOrderService), serviceBusBinding, new Uri(topicAddress), new Uri(londonSubscription));
hostLondon.Open();

var hostReading = new ServiceHost(typeof(OrderService));
hostReading.Extensions.Add(new Writer(ConsoleColor.Yellow));
hostReading.AddServiceEndpoint(typeof(IOrderService), serviceBusBinding, new Uri(topicAddress), new Uri(readingSubscription));
hostReading.Open();

Next I have created a simple WCF client using the ChannelFactory API and used it to send messages to the topic. In the current CTP, you have to set the filter properties separately on the BrokeredMessage object, which is a native ServiceBus message object and is different from the WCF Message object.

ServiceBusMessagingBinding automatically converts the WCF message object (generated by the proxy object) to a BrokeredMessage object which is then sent to ServiceBus. On the receive side a similar conversion happens from BrokeredMessage to a WCF Message which is then on to WCF Service. To set BrokeredMessage specific properties, we need to create a BrokeredMessageProperty object, set the properties on it and add it to generated WCF messages. ServiceBusMessagingBinding looks for this property and copy all the properties to the BrokeredMessage object it creates. I’m doing exactly the same using the OperationContextScope below.

var cf = new ChannelFactory<IOrderService>(serviceBusBinding, topicAddress);
var proxy = cf.CreateChannel();

using (new OperationContextScope( (IContextChannel)  proxy))
{
    var bmp = new BrokeredMessageProperty();
    bmp.Properties["ShipCity"] = "london";
    OperationContext.Current.OutgoingMessageProperties.Add(BrokeredMessageProperty.Name, bmp);
    proxy.Submit(new Order {Amount = 200, ShipCity = "london"});

    bmp.Properties["ShipCity"] = "reading";
    proxy.Submit(new Order { Amount = 322, ShipCity = "london" });

}

As you can see from the following output that the ‘london’ message was received by London host (‘yellow’) while the ‘reading’ message was received by Reading (‘cyan’) service host.

image

Finally yes it looks bit ugly that I have to specify these promoted/filter properties separately Sad smilefrom their WCF contract values. Ideally ServiceBus APIs should have picked up the ShipCity property from my Order object.  Unfortunately in the current CTP you have to do this separately as I have shown you in the above snippet.  In the next post I’ll show how you can extend WCF so that it automatically picks these ‘filter properties’  from operations parameters or WCF DataContract(s).

Stay tuned…

Posted in ServiceBusV2 | 3 Comments »

WCF with Service Bus V2: Queues

Posted by zamd on May 18, 2011

May CTP of Service Bus introduced tons of new messaging capabilities including Queuing and Topic based pub/sub messaging. Check out Clemens post for an overview.

The usage of Service Bus messaging entities (Queues, Topics) is divided across two namespaces – a ‘Management Namespace’ and a ‘Runtime Namespace’. Management namespace is used to create/define messaging entities while the Runtime Namespace is used to do the actual messaging operations using the already defined ‘messaging entities’. The management namespace is exposed using as a REST service and Service Bus SDK also provides with a client library which hides the HTTP based interface behind a nice object oriented API.

So once a messaging entity (for example a Queue) is created and is ready you can use the Runtime API to send/receive messages. May CTP comes with two flavours of runtime APIs.

  • A low level MessagingFactory based API: Which is the native, high fidelity Service Bus API exposing all the Service Bus messaging features.
  • A higher level WCF binding: Which internally uses the MessagingFactory API and integrates the Service Bus messaging with WCF programing model.

There are already few blogs posts which talks about the MessagingFactory API so I’m not going to repeat that here. Let’s see how can I use Service Bus queues a communication mechanism between my WCF client & service. I’ll start with creating a usual one-way service as you would do for MSMQ or any other queuing technology.

    [ServiceContract]
    public interface IHelloService
    {
        [OperationContract(IsOneWay = true)]
        void SayHello(string input);
    }

    public class HelloService : IHelloService
    {
        public void SayHello(string input)
        {
            Console.WriteLine("Says: " + input);
        }
    }

 

Following snippet create a queue (line 6) using the management API. I’m using the wrapper class provided with the SDK for the REST based management service.

I then create a binding instance and specify ACS credentials on it. All the operations on Service Bus requires a valid token from ACS and these credentials would be used to acquire an ACS token for SB operations.

Finally I added an endpoint specifying the queue location as the endpoint address.

  1. static void Main(string[] args)
  2. {
  3.     const string baseAddress = "sb://soundbyte.servicebus.appfabriclabs.com";
  4.     var credential = TransportClientCredentialBase.CreateSharedSecretCredential("owner",
  5.                                                                                 "zYDbQ2wM1k7J32323232323VdbBukWJTF6Y=");
  6.     CreateQueue(baseAddress, credential,"q1");
  7.  
  8.     var serviceBusBinding = new ServiceBusMessagingBinding();
  9.     serviceBusBinding.MessagingFactorySettings.Credential = credential;
  10.  
  11.     var sh = new ServiceHost(typeof (HelloService));
  12.     sh.AddServiceEndpoint(typeof (IHelloService), serviceBusBinding, baseAddress + "/q1");
  13.     sh.Open();
  14.  
  15.  
  16.  
  17.     Console.WriteLine("Host ready…");
  18.  
  19.     var cf = new ChannelFactory<IHelloService>(serviceBusBinding, baseAddress + "/q1");
  20.     var proxy = cf.CreateChannel();
  21.     for (int i = 0; i < 10; i++)
  22.     {
  23.         proxy.SayHello("Hi " + i);
  24.     }
  25.  
  26.     Console.ReadLine();
  27. }

 

My client is a standard WCF proxy (line 19-23) which is sending messages to the same endpoint address (queue location) using the same ServiceBus binding.

The above produces following expected output. Easy isn’t it?

image

By integrating Service Bus messaging with WCF programing model, we can reuse all of the WCF goodness with Service bus messaging. For example I can secure messages while they are traveling through queues/topics by just changing the binding.

Next time I’ll show how to do pub/sub using the WCF programing model.

Posted in ServiceBusV2 | 2 Comments »

 
Follow

Get every new post delivered to your Inbox.