If you have missed – here is the Part 1 of the article, the one where I am researching for the tools we do need to be able to build the Node.js clustered app with autodiscovery.
Let’s continue, then. First of all – you can find the application on github here: https://github.com/PavelPolyakov/nodejs-clustered-app-with-autodiscovery . There you can also find the setup guide and try to launch the cluster yourself, you don’t need much, just four Vagrant machines.
In the application below is a demo example of the Node.js clustered app with autodiscovery. I’m sure it’s possible to build another setups and, probably, there are some already, but I came up with the next one.
Setup structure, Vagrant
Here is the structure of the setup.
My application consists from two major parts:
- etcd server. The server which is our key-value storage, the nodes would register theirselves there, the other nodes would read the list of the registered nodes and this way knew which nodes are already in cluster.|
It’s important to state that in this setup the etcd is launched as single node, not in the cluster, this is done only because it’s demo, you shouldn’t do this in production.
etcd server is run using Vagrant, it’s installed according to the instructions from the provision.sh file. Besides the server, we install the etcd-browser on the machine, so it’s possible to track the keys visually. - node-app. The node app is an example of the Node.js clustered app. The main goal of the app is:
- start
- register itself in the etcd server
- find out if there are any nodes in the cluster
- join cluster
- do some job
In this particular app the master node is going to send the tasks to the worker nodes. Worked nodes would do the job and return the results.
It’s possible to launch several Vagrant machines from one Vagrantfile, so we will launch three machines – A, B and C, which will luckily form the cluster.
Besides, there is one important configuration option for the node-app – the IP of the etcd cluster server, it should be filled in to the config/default.js file, like this:123456module.exports = {"port": 22222,"ttl": 120,"dateFormat": "YYYY-MM-DD HH:mm:ss","etcd": ["172.28.128.26:4001"],}
The app
I need to say, that for better understanding it would be better if you checkout the codebase from github, follow the setup instructions and run it. But I don’t think you will do that at this very moment – so continue reading 🙂
Here is the require block:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 |
'use strict'; const os = require('os'); const ifaces = os.networkInterfaces(); const config = require('config'); const _ = require('lodash'); const uuid = require('node-uuid'); const Etcd = require('node-etcd'); const Promise = require('bluebird'); const Discover = require('node-discover'); const moniker = require('moniker'); const moment = require('moment'); const debug = { app: require('debug')('node-app'), master: require('debug')('master'), cluster: require('debug')('cluster'), worker: require('debug')('worker') }; const DATE_FORMAT = config.get('dateFormat'); // helper function, returns our own external IP (the one which is makes us available for the PC) function _getIP() { return _.filter(ifaces['eth1'], (record)=> { return record.family === 'IPv4'; }).pop().address; } |
Things to mention are the next:
- moniker library – allows us to get heroku like hostnames
- _getIP method – allows us to get our real external IP, filtering the network interfaces, which are provided by the Node.js os module.
- node-discover library – which allows us to form the cluster, is responsible for master election and allows us to send the messages through cluster.
- node-etcd library – nodejs implementation of the etcd API, allows us to interact with the etcd server.
Here is the part when the app starts:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 |
// init and support etcd interaction Promise.coroutine(function *() { const etcd = new Etcd(config.get('etcd'), {timeout: 1000}); let IP = _getIP(); let currentServices = []; debug.app(`my IP: ${IP}`); debug.app(`etcd addresses: ${config.get('etcd')}`); // write ourselves to the registry of the available services yield Promise.promisify(etcd.set, {context: etcd})(`services/${IP}`, `${IP}`, {ttl: config.get('ttl')}); // each 60 seconds - show that we are not dead setInterval(() => { Promise.promisify(etcd.set, {context: etcd})(`services/${IP}`, `${IP}`, {ttl: config.get('ttl')}); }, config.get('ttl') / 2 * 1000); // watch the /services directory, to act on update // explicitly set watcher to observe since the index 1 let watcher = etcd.watcher('services', 1, {recursive: true}); // we should track each change watcher.on('change', () => { /* some important code */ }); })().catch((error) => { require('debug')('error')(error.toString()); process.exit(); }); |
When we start the app, the first thing we do is the connection to the etcd server. There we register ourselves by the key services/${IP} with the ttl 120 seconds and setup the interval, which continues to register the node each 60 seconds. This is done with the purpose, if some node would crush or fail, it would be removed from cluster in 120 seconds. If everything is ok, it would continuously re-register itself. At the end – we subscribe the node to all of the changes inside the directory services, so each time the node is registered there or removed from the directory – we would be notified.
Next – we have a closer look at the watcher callback:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 |
// we should track each change watcher.on('change', () => { Promise.coroutine(function *() { // get the list of the services available after the update let services = yield Promise.promisify(etcd.get, {context: etcd})('services'); let updatedServices = _.pluck(services.node.nodes, 'value'); // if the list of the new services is not equal to the one we stored - reinit the discovery service if (!_.isEqual(currentServices.sort(), updatedServices.sort())) { // check if it's init change if(currentServices.length === 0) { currentServices = updatedServices; // init the discovery for the first time debug.app('Init the appNode'); appNode.init(currentServices); } else { currentServices = updatedServices; debug.app(`new: ${JSON.stringify(currentServices, 'value')}`); // call the appropriate method of the appNode debug.app('Notifying the appNode about the change'); appNode.change(currentServices); } } })(); }); |
Each time the directory is changed, we first read all of the keys from it. Then we compare it with the array of the keys we already have (or don’t have) and decide wether it’s first time the node is launched or the change was done during the regular lifecycle.
If that was initial launch – we call the init method of the node app, if the callback was called during regular lifecycle – we call the change method. As you already guessed – the appNode is an entity, which represents the logic of our app.
The appNode looks the next way:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 |
// app node object let appNode = { id: `${moniker.choose()}-${uuid.v4()}`, d: undefined, // holder for the discovery service intervals: {}, // holder for the possible intervals /** * init the discovery service * @param {Array} nodes array of the unicast IPs */ init: function (nodes) { /* some important code */ }, /** * stop all the current interaction and init another discovery service * @param {Array} nodes array of the unicast IPs, would be passed further */ change: function (nodes) { /* some important code */ } }; |
It consists from:
- id – the unique id of the node
- d – reserved to hold the node-discover instance
- intervals – the object to hold the possible intervals
- init – method which is called when the node was just started
- change – method which is called when the list of the nodes in the cluster was updated
Now, let’s do a close look to the init method:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 |
/** * init the discovery service * @param {Array} nodes array of the unicast IPs */ init: function (nodes) { this.d = new Discover({port: config.get('port'), address: _getIP(), unicast: nodes}); this.d.broadcast.instanceUuid = this.id; // redefine the node id, make it more pretty debug.worker(`My id: ${this.id}`); // add adv information this.d.advertise({}); // what to do if this node was elected as master this.d.on("promotion", (obj) => { /* some important code */ }); // what to do if this node is not master anymore this.d.on("demotion", (obj) => { /* some important code */ }); // what to do if another node was added this.d.on("added", (obj) => { /* some important code */ }); // what to do if another node was removed this.d.on("removed", (obj) => { /* some important code */ }); // interval which prints currently available nodes each 15 seconds this.intervals.nodesInfo = setInterval(() => { /* some important code */ }, 60 * 1000); // join tasks channel, by default this.d.join('tasks', (data) => { //* some important code */ }); // join tasks channel, by default this.d.join('results', (data) => { /* some important code */ }); } |
What happens there, finally we do call the Discovery constructor and pass there the list of the IPs which we took from the etcd server. Then, we subscribe to several events, all of these are provided by the node-discover library:
- promotion – called when the node become master
- demotion – called when the node is not the master anymore, the new was elected
- added – called when any new node is joining the cluster
- removed – called when any node is leaving the cluster
Besides, we do join two channels – tasks and results. To join a channel in terminology of node-discover, means to subscribe for the messages which goes to the channel. You do not need to join the channel, if need to send the message to it.
Now, let’s see what the master does:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 |
// what to do if this node was elected as master this.d.on("promotion", (obj) => { debug.master(`I was promoted to a master. ${this.id}`); // leave tasks channel this.d.leave('tasks'); this.intervals.master = setInterval(() => { // do not generate messages in case there are no nodes to process if (_.keys(this.d.nodes).length === 0) { debug.master(`${moment().format(DATE_FORMAT)} no nodes, skip task generation :(`); return; } /** * Load balancer, should return the id of the node, which would execute the task * @param nodes */ function balance(nodes) { let keys = _.keys(nodes); // choose random node as executor let random = _.random(0, keys.length - 1); return nodes[keys[random]]; } // each task is unique, but we choose one executor let task = { uuid: uuid.v4(), executor: balance(this.d.nodes).id, number: _.random(30, 60) }; this.d.send('tasks', task); debug.master(`${moment().format(DATE_FORMAT)} task was sent:`); debug.master(task); }, 10 * 1000); }); |
In this app, the master is the node which cames up with the task and sends it to the tasks channel, the other nodes are considered as workers – the listen for the tasks channel, pick the tasks from there and do them.
The first thing the master does – it leaves the tasks channel, because it’s important to send the tasks there, not to listen for the tasks. Then, we set the interval – each 10 seconds the node generates the tasks, which should be executed by executor. Executor is defined by the load balancer – the balance method. In our example – we put the random number from 30 to 60 to the task object. The goal of the worked node – to multiply it for 3 and return the result.
Let’s see what the worker node does, then:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 |
// join tasks channel, by default this.d.join('tasks', (data) => { // do not do anything, if we are master if (this.d.me.isMaster) { return; } // this message is not for us if (data.executor !== this.id) { return; } debug.worker(`${moment().format(DATE_FORMAT)} got message:`); debug.worker(data); let startMoment = moment(); // do some complex Math, calculation could last from 1 to 15 seconds setTimeout(()=> { let finishMoment = moment(); let result = {uuid: data.uuid, result: data.number * 3}; debug.worker(`${moment().format(DATE_FORMAT)} message was processed with the result:`); debug.worker(result); this.d.send('results', _.merge(result, { start: startMoment.format(DATE_FORMAT), finish: finishMoment.format(DATE_FORMAT) })); }, _.random(1, 15) * 1000); }); |
This callback is called each time the new message happens to the tasks channel. The node checks then – if it’s the master, if yes (it theory, it shouldn’t happen) – we should not take it. If task was not meant for us – do not take it. But if we are the executor – let’s wait from 1 to 15 seconds, and then put the result object to the channel results. The job is done then.
OK, that was it! I think I have explained everything, the only thing is left – the demo.
Below is the video which shows the app in action:
In this article, I have described my findings regarding the topic Node.js clustered app with autodiscovery. I can’t say that this is the best way to build the clustered app, however, I think this knowledge would be a good starting point for those who is interested.