CARVIEW |
Navigation Menu
-
Notifications
You must be signed in to change notification settings - Fork 82
Messaging and Redis
The Reusability ServiceStack.UseCase contains a good introductory demo of using MQ's(message-queues) in ServiceStack where the same services can be called via Web Service or via MQ. Using MQ's provide instant response times, in addition to reliable and durable execution of your services.
The SMessage service
The SMessage service shows an example of dividing a service into multiple subtasks and dispatching them for instant response times and parallel processing of blocking operations without needing any multi-threading code in your application logic, e.g:
public object Any(SMessage request)
{
var sw = Stopwatch.StartNew();
if (!request.Defer) //Process sequentially or Defer execution
{
//Executes service sequentially: N+1 service calls, will timeout if N is too big
var results = new List<SMessageReceipt>();
results.AddRange(Email.Send(request));
results.AddRange(Facebook.Send(request));
results.AddRange(Twitter.Send(request));
Db.InsertAll(results);
}
else
{
//Split service into smaller tasks and defers messages in MQ broker for processing in parallel
Email.CreateMessages(request).ForEach(MessageProducer.Publish);
Facebook.CreateMessages(request).ForEach(MessageProducer.Publish);
Twitter.CreateMessages(request).ForEach(MessageProducer.Publish);
}
return new SMessageResponse {
TimeTakenMs = sw.ElapsedMilliseconds,
};
}
Another benefit of dispatching messages into multiple sub tasks is if the Email API is slow, it doesn't hold up the processing of the other tasks which are all running concurrently in the background each individually processing messages as fast as they can.
The RedisMqServer also supports spawning any number of background threads for individual requests, so if Posting to twitter was an IO intensive operation you can double the throughput by simply assigning 2 or more worker threads, e.g:
mqService.RegisterHandler<PostStatusTwitter>(ServiceController.ExecuteMessage, noOfThreads:2);
mqService.RegisterHandler<CallFacebook>(ServiceController.ExecuteMessage);
mqService.RegisterHandler<EmailMessage>(ServiceController.ExecuteMessage);
A redis-based message queue client/server that can be hosted in any .NET or ASP.NET application. All Redis MQ Hosts lives in the ServiceStack.Redis project and brings the many benefits of using a Message Queue.
Works by using a background thread for each service. This allows you to process messages from different services concurrently. Recommended if you have any long-running services so other services can still run in parallel.
Major kudos goes to Redis which thanks to its versatility, has Pub/Sub and Lists primitives that makes implementing a Queue trivial.
The logical architecture of how a MQ Publisher and MQ Host works together in ServiceStack:
All MQ implementations share the same IMessageService so they're easily swappable and testable. There is also an InMemoryTransientMessageService available, useful for development & testing.
These versions already sports the major features you've come to expect from a MQ:
- Each service maintains its own Standard and Priority MQ's
- Automatic Retries on messages generating errors with Failed messages sent to a DLQ (Dead Letter Queue) when its Retry threshold is reached.
- Each message can have a ReplyTo pointing to any Queue, alternatively you can even provide a ServiceStack endpoint URL which will send the response to a Web Service instead. If the web service is not available it falls back into publishing it in the default Response Queue so you never lose a message! MQ/Web Services that don't return any output have their Request DTOs sent to a rolling Out queue which can be monitored by external services (i.e. the publisher/callee) to determine when the request has been processed.
Although you can host RedisMqServer in any ASP.NET web app, the benefit of hosting inside ServiceStack is that your web services are already capable of processing Redis MQ messages without any changes required since they're already effectively designed to work like a Message service to begin with, i.e. C# POCO-in -> C# POCO-out.
This is another example of how ServiceStack's prescribed DTO-first architecture continues to pay dividends since each web service is a DI clean-room allowing your C# logic to be kept pure as it only has to deal with untainted POCO DTOs, allowing your same web service to be re-used in: SOAP, REST (JSON,XML,JSV,CSV,HTML) web services, view models for dynamic HTML pages and now as a MQ service!
Eventually (based on feedback) there will be posts/documentation/examples forthcoming covering how to use it, in the meantime you can Check out the Messaging API to see how simple it is to use. To see some working code showing some of the capabilities listed above, view the tests.
Hooking up a basic send/reply example is as easy as:
//DTO messages:
public class Hello { public string Name { get; set; } }
public class HelloResponse { public string Result { get; set; } }
var redisFactory = new PooledRedisClientManager("localhost:6379");
var mqHost = new RedisMqServer(redisFactory, retryCount:2);
//Server - MQ Service Impl:
mqHost.RegisterHandler<Hello>(m =>
new HelloResponse { Result = "Hello, " + m.GetBody().Name });
mqHost.Start();
...
//Client - Process Response:
mqHost.RegisterHandler<HelloResponse>(m => {
Consle.Log("Received: " + m.GetBody().Result);
});
mqHost.Start();
...
//Producer - Start publishing messages:
var mqClient = mqHost.CreateMessageQueueClient();
mqClient.Publish(new Hello { Name = "ServiceStack" });
Note: This is a quote of a google group topic to provide more information about ServiceStack and Redis until more documentation/examples are added.
Redis is a NoSQL datastore that runs as a network server. To start it you need to run an instance of redis-server either locally or remotely accessible.
Probably will help to understand the background concepts behind Redis so you can get a better idea of how it works. A good start is this brief overview of Redis which also includes as how you can install it: https://stackoverflow.com/a/2760282/85785
The RedisMQ Host is in ServiceStack.Redis project which is a dependency on the ServiceStack NuGet package (if that's how you have ServiceStack installed).
At the moment the there is not much documentation available and will look to improve this after I get the next release of ServiceStack out.
You can look through the tests to get a better idea of how it works, especially the Request / Reply example: https://github.com/ServiceStack/ServiceStack.Redis/blob/master/tests/ServiceStack.Redis.Tests/RedisMqHostTests.cs#L293
Basically the RedisMQ runs as a separate background thread you start from inside your AppHostHttpListenerBase. For duplex communication each client and server will require its own AppHostHttpListenerBase + RedisMQ Host. Although unless your server is going to call HTTP services on the client you don't need it. But I've included a simple example of what a client / server with HTTP + RedisMQ enabled would look like (note was done in a text editor so could be compile errors):
//Shared DTOs
public class Hello {
public string Name { get; set; }
}
public class HelloResponse {
public string Result { get; set; }
}
//Server
public class HelloService : Service
{
public object Any(Hello req)
{
return new HelloResponse { Result = "Hello, " + req.Name };
}
}
public class ServerAppHost : AppHostHttpListenerBase
{
public ServerAppHost() : base ("Hello Server Services", typeof(HelloService).Assembly) {}
public override void Configure(Container container)
{
base.Routes
.Add<Hello>("/hello")
.Add<Hello>("/hello/{Name}");
var redisFactory = new PooledRedisClientManager("localhost:6379");
container.Register<IRedisClientsManager>(redisFactory); // req. to log exceptions in redis
var mqHost = new RedisMqServer(redisFactory, retryCount:2);
//Server - MQ Service Impl:
//Listens for 'Hello' messages sent with 'mqClient.Publish(new Hello { Name = "Client" });'
mqHost.RegisterHandler<Hello>(m => {
return this.ServiceController.ExecuteMessage(m);
});
mqHost.Start(); //Starts listening for messages
}
}
var serverAppHost = new ServerAppHost();
serverAppHost.Init();
serverAppHost.Start("https://serverhost:81"); //Starts HttpListener listening on 81
Console.ReadLine(); //Block the server from exiting (i.e. if running inside Console App)
//Client
public class ClientAppHost : AppHostHttpListenerBase
{
public ClientAppHost() : base ("Hello Client Services", typeof(HelloService).Assembly) {}
public override void Configure(Container container)
{
var redisFactory = new PooledRedisClientManager("redishost:6379");
var mqHost = new RedisMqServer(redisFactory, retryCount:2);
//Client - MQ Service Impl:
//Listens for 'HelloResponse' as returned by the 'Hello' service on the server
mqHost.RegisterHandler<HelloResponse>(m => {
Console.WriteLine("Received: " + m.GetBody().Result); //Fired
});
//or to call an existing service with:
//mqHost.RegisterHandler<HelloResponse>(m =>
// this.ServiceController.ExecuteMessage(m));
mqHost.Start(); //Starts listening for messages
}
}
var clientAppHost = new ClientAppHost();
clientAppHost.Init();
clientAppHost.Start("https://clienthost:82"); //Starts HttpListener listening on 81
Console.ReadLine(); //Block the server from exiting (i.e. if running inside Console App)
With the above configuration the client can talk to the server with:
var mqClient = mqHost.CreateMessageQueueClient();
mqClient.Publish(new Hello { Name = "Client 1" });
The first RedisMQ host listening to the 'Hello' message (i.e. the server) will receive the message. When the server returns a 'HelloResponse' message it gets put on the 'HelloResponse' Inbox Queue and the first RedisMQ Host listening to the 'HelloResponse' message (i.e. Client) will have their callback fired.
However MQ's are normally used for async OneWay messages as any client host listening to 'HelloResponse' could receive the response message, and only 1 will, which is not guaranteed to be the client that sent the original message. For this reason you want to make use of the ReplyTo field of the Message for Request/Reply responses.
var uniqueCallbackQ = "mq:c1" + ":" Guid.NewGuid().ToString("N");
var clientMsg = new Message<Hello>(new Hello { Name = "Client 1" }) {
ReplyTo = uniqueCallbackQ
};
mqClient.Publish( clientMsg );
var response = mqClient.Get(clientMsg).ToMessage<HelloResponse>(); //Blocks thread on client until reply message is received
Note: The ReplyTo callback url could also be a ServiceStack endpoint that expects a 'HelloResponse' e.g.
var clientMsg = new Message<Hello>(new Hello { Name = "Client 1" }) {
ReplyTo = "https://clienthost:82/helloresponse"
};
In this case the server wont put the response message on the back on the MQ and will instead send the response directly to your client web service host.
There's a few MQ concepts covered above here, I invite you to do your own reading on MQ's in general as it works a little different to normal request/reply web services.
See Also: Redis Commands (PDF)
- Why ServiceStack?
- What is a message based web service?
- Advantages of message based web services
- Why remote services should use separate DTOs
- Getting Started
- Reference
- Clients
- Formats
- View Engines 4. Razor & Markdown Razor
- Hosts
- Security
- Advanced
- Configuration options
- Access HTTP specific features in services
- Logging
- Serialization/deserialization
- Request/response filters
- Filter attributes
- Concurrency Model
- Built-in caching options
- Built-in profiling
- Messaging and Redis
- Form Hijacking Prevention
- Auto-Mapping
- HTTP Utils
- Virtual File System
- Config API
- Physical Project Structure
- Modularizing Services
- MVC Integration
- Plugins 3. Request logger 4. Swagger API
- Tests
- Other Languages
- Use Cases
- Performance
- How To
- Future