How to setup a pub/sub system — by Steven Lacerda (Morgan Hill, CA)

Steven Lacerda
3 min readAug 15, 2018

--

When I started this project, I was perplexed by the lack of information addressing my exact needs when it came to setup of a publish/subscribe mechanism. My situation required React client-side, with c# using websocketsharp server side.

Thus, I’m writing this for anyone else that runs into similar issues.

CLIENT SIDE

Client-side setup was interesting because a single client could be subscribed to the same channel more than once, with different callbacks being executed for each subscription. What this meant, was that not only would I have several callbacks, but I’d have to keep track of those callbacks. So, here’s the end result client-side.

Note: socket is the variable I stored my socket connection in. You should have something similar, so that when you connected your socket you saved that connection response in a variable. Then, you can call these functions from socket.FUNCTION_NAME().

Subscribe stores each users subscription as:

this._channels = {  CHANNEL_NAME: [ { id: 3492–323–234343–1234, callback: SOME_FUNCTION }, etc… ],  CHANNEL_NAME: [ { id: 3492–323–234343–1234, callback: SOME_FUNCTION }, etc… ],}

With this format, we can store several callbacks for the same channel, and using a unique ID we’re able to store each function and know which one belongs where. Notice that we return the unique ID, so you should store this unique ID somewhere in your client code.

subscribe: function subscribe(channel, callback) {  var self = this;  var id = self._getNextId();  var randomId = self.randomIdGenerator();  var channelObj = {    id: randomId,    callback: callback  }  if (self._channels[channel]) {    self._channels[channel].push(channelObj);  } else {    self._channels[channel] = [];    self._channels[channel].push(channelObj);  }  self._send({    msg: ‘method’,    method: ‘pubsub.subscribe’,    channel: channel,    id: id  });  return randomId;}

We also need to institute a resubscribe function, in case the server goes down, we need it to reset the subscriptions. So, you should call this when there’s a disconnect and reconnect.

resubscribe: function resubscribe() {  var self = this;  var id = self._getNextId();  self._send({    msg: ‘method’,    method: ‘pubsub.resubscribe’,    channel: Object.keys(this._channels),    id: id,  })}

You should also unsubscribe from your channels when you’re done, which is where the unique ID comes into focus. In order to identify which callback to unsubscribe from, you’ll need the unique ID.

unsubscribe: function unsubscribe(channel, uniqueId) {  var self = this;  var id = self._getNextId();  self._send({    msg: ‘method’,    method: ‘pubsub.unsubscribe’,    channel: channel,    id: id  });  let channelArray = self._channels[channel] || [];  let index;  for (let i = 0; i < channelArray.length; i++) {    let subscriptionObj = channelArray[i];    if (subscriptionObj.id == uniqueId) {      index = i;    }  }  channelArray.splice(index, 1);  return !!index ? true : false;}

And, to publish to the channel:

publish: function publish(channel, params) {  var self = this;  var id = self._getNextId();  self._send({    msg: ‘method’,    method: ‘pubsub.publish’,    channel: channel,    params: params,    id: id,  })}

And, here’s our random id generator:

randomIdGenerator: function randomIdGenerator() {  var now = Date.now();  var first = ‘xxxxxxxx-xxxx’.replace(/[xy]/g, function(c) {    var r = Math.random() * 16 | 0, v = c == ‘x’ ? r : (r & 0x3 | 0x8);    return v.toString(16);  });  return `${ first }-${ now }`;}socket.randomIdGenerator(); // returns something like — 12345678–1234–1234567891234

Call these socket functions from your components:

const userDeleteID = socket.subscribe(‘user.delete’, function() { // do something });socket.resubscribe() would be sufficient to resubscribe.socket.unsubscribe(‘user.delete’, userDeleteId)socket.publish(“user.delete”, { extra: ‘payload’, data: ‘here’ });

SERVER SIDE

On the server, just keep a list of the channels and who is subscribed, so when the client send a socket.subscribe(‘user.delete’) request, add the user info (unique info, in my case it was an id) to a Dictionary<string, string>…looks like { ‘user.delete’: [‘admin’, ‘test’] }.

When socket.publish(‘user.delete’, { some: ‘data’ }) comes in, you just forward it to all the subscribing sessions. So, you would lookup the sessions in SessionManager (or something similar) and forward the data and channel to ‘admin’ and ‘test’.

Your client will receive a message that contains the channel, and additional payload, which you can use in your components.

I would show this code to you, but different implementations will diverge here, plus it’s not a great idea for me to expose my companies server code, so I will leave this up to you.

Don’t be a prisoner to your code, master it!!!

Solving a problem is as good as those sexual feelings…sometimes, okay, maybe not. Straight from Morgan Hill, CA. Good luck!

— by Steven Lacerda

--

--

Steven Lacerda
Steven Lacerda

Written by Steven Lacerda

Steve Lacerda is a software engineer specializing in web development. His favorite 80’s song is Let’s Put the X in Sex by Kiss.

No responses yet