You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
{{ message }}
This repository was archived by the owner on Oct 28, 2022. It is now read-only.
Implements a very easy to use sockets API based on IObservable. It allows very simple protocol implementations such as:
var client = new ReactiveClient("127.0.0.1", 1055);
// The parsing of messages is done in a simple Rx query over the receiver observable
// Note this protocol has a fixed header part containing the payload message length
// And the message payload itself. Bytes are consumed from the client.Receiver
// automatically so its behavior is intuitive.
IObservable<string> messages = from header in client.Receiver.Buffer(4)
let length = BitConverter.ToInt32(header.ToArray(), 0)
let body = client.Receiver.Take(length)
select Encoding.UTF8.GetString(body.ToEnumerable().ToArray());
// Finally, we subscribe on a background thread to process messages when they are available
messages.SubscribeOn(TaskPoolScheduler.Default).Subscribe(message => Console.WriteLine(message));
client.ConnectAsync().Wait();
Creating the server implementation is equally straightforward (this is an echo server for the same message format):
var server = new ReactiveListener(1055);
server.Connections.Subscribe(socket =>
{
IObservable<string> messages = from header in socket.Receiver.Buffer(4)
let length = BitConverter.ToInt32(header.ToArray(), 0)
let body = socket.Receiver.Take(length)
select Encoding.UTF8.GetString(body.ToEnumerable().ToArray());
// Echo the incoming message with the same format.
messages.Subscribe(message =>
{
var body = encoding.GetBytes(message);
var header = BitConverter.GetBytes(body.Length);
var payload = header.Concat(body).ToArray();
socket.SendAsync(payload).Wait();
});
});
server.Start();