From d2cce4f166dd7a3b6bd5dab3f396e7cb9e72c561 Mon Sep 17 00:00:00 2001 From: Calvin Montgomery Date: Sat, 15 Oct 2016 12:36:20 -0700 Subject: [PATCH] Work on auto reloading partition map from redis --- package.json | 1 + src/partition/partitionconfig.js | 12 ---- src/partition/partitiondecider.js | 18 ++++-- src/partition/partitionmap.js | 67 ++++++++++++++++++++++ src/partition/partitionmodule.js | 24 +++++++- src/partition/redispartitionmapreloader.js | 54 +++++++++++++++++ src/server.js | 23 +++----- 7 files changed, 164 insertions(+), 35 deletions(-) create mode 100644 src/partition/partitionmap.js create mode 100644 src/partition/redispartitionmapreloader.js diff --git a/package.json b/package.json index 9cbf6187..ff1f2026 100644 --- a/package.json +++ b/package.json @@ -44,6 +44,7 @@ "socket.io-redis": "^1.0.0", "source-map-support": "^0.4.0", "status-message-polyfill": "git://github.com/calzoneman/status-message-polyfill", + "toml": "^2.3.0", "uuid": "^2.0.1", "yamljs": "^0.1.6" }, diff --git a/src/partition/partitionconfig.js b/src/partition/partitionconfig.js index b9ec4cdd..b3ccd46d 100644 --- a/src/partition/partitionconfig.js +++ b/src/partition/partitionconfig.js @@ -3,18 +3,6 @@ class PartitionConfig { this.config = config; } - getPartitionMap() { - return this.config.partitions; - } - - getOverrideMap() { - return this.config.overrides; - } - - getPool() { - return this.config.pool; - } - getIdentity() { return this.config.identity; } diff --git a/src/partition/partitiondecider.js b/src/partition/partitiondecider.js index 0f6226cc..a93668f3 100644 --- a/src/partition/partitiondecider.js +++ b/src/partition/partitiondecider.js @@ -1,24 +1,26 @@ import { murmurHash1 } from '../util/murmur'; class PartitionDecider { - constructor(config) { + constructor(config, partitionMap) { this.config = config; + this.partitionMap = partitionMap; } getPartitionForChannel(channel) { - const partitionMap = this.config.getPartitionMap(); - return partitionMap[this.getPartitionIdentityForChannel(channel)]; + return this.partitionMap.getPartitions()[this.getPartitionIdentityForChannel(channel)]; } getPartitionIdentityForChannel(channel) { channel = channel.toLowerCase(); - const overrideMap = this.config.getOverrideMap(); + const overrideMap = this.partitionMap.getOverrides(); if (overrideMap.hasOwnProperty(channel)) { return overrideMap[channel]; - } else { - const pool = this.config.getPool(); + } else if (this.partitionMap.getPool().length > 0) { + const pool = this.partitionMap.getPool(); const i = murmurHash1(channel) % pool.length; return pool[i]; + } else { + return { servers: [] }; } } @@ -26,6 +28,10 @@ class PartitionDecider { return this.getPartitionIdentityForChannel(channel) === this.config.getIdentity(); } + + setPartitionMap(newMap) { + this.partitionMap = newMap; + } } export { PartitionDecider }; diff --git a/src/partition/partitionmap.js b/src/partition/partitionmap.js new file mode 100644 index 00000000..2d7a3c96 --- /dev/null +++ b/src/partition/partitionmap.js @@ -0,0 +1,67 @@ +import crypto from 'crypto'; +import fs from 'fs'; +import toml from 'toml'; + +function sha256(input) { + var hash = crypto.createHash('sha256'); + hash.update(input); + return hash.digest('base64'); +} + +class PartitionMap { + /** + * @param {Map} partitions Map of node ids to io configs + * @param {Array} pool List of available nodes + * @param {Map} overrides Overrides for node assignment + */ + constructor(partitions, pool, overrides) { + this.partitions = partitions; + this.pool = pool; + this.overrides = overrides || {}; + this._hash = sha256(JSON.stringify(this.partitions) + + JSON.stringify(this.pool) + + JSON.stringify(this.overrides)); + } + + getHash() { + return this._hash; + } + + getPartitions() { + return this.partitions; + } + + getPool() { + return this.pool; + } + + getOverrides() { + return this.overrides; + } + + toJSON() { + return { + partitions: this.partitions, + pool: this.pool, + overrides: this.overrides, + hash: this._hash + }; + } + + static fromJSON(json) { + return new PartitionMap(json.partitions, json.pool, json.overrides); + } + + static fromFile(filename) { + const rawData = fs.readFileSync(filename).toString('utf8'); + const parsed = toml.parse(rawData); + + return PartitionMap.fromJSON(parsed); + } + + static empty() { + return new PartitionMap({}, [], {}); + } +} + +export { PartitionMap }; diff --git a/src/partition/partitionmodule.js b/src/partition/partitionmodule.js index 6c3b61cb..42639b53 100644 --- a/src/partition/partitionmodule.js +++ b/src/partition/partitionmodule.js @@ -7,6 +7,7 @@ import logger from 'cytube-common/lib/logger'; import LegacyConfig from '../config'; import path from 'path'; import { AnnouncementRefresher } from './announcementrefresher'; +import { RedisPartitionMapReloader } from './redispartitionmapreloader'; const PARTITION_CONFIG_PATH = path.resolve(__dirname, '..', '..', 'conf', 'partitions.toml'); @@ -23,13 +24,13 @@ class PartitionModule { initConfig() { logger.initialize(null, null, LegacyConfig.get('debug')); try { - this.partitionConfig = this.loadPartitionMap(); + this.partitionConfig = this.loadPartitionConfig(); } catch (error) { process.exit(1); } } - loadPartitionMap() { + loadPartitionConfig() { try { return loadFromToml(PartitionConfig, PARTITION_CONFIG_PATH); } catch (error) { @@ -44,9 +45,26 @@ class PartitionModule { } } + getPartitionMapReloader() { + if (!this.partitionMapReloader) { + const redisProvider = this.getRedisClientProvider(); + this.partitionMapReloader = new RedisPartitionMapReloader( + redisProvider.get(), // Client for GET partitionMap + redisProvider.get()); // Subscribe client + } + + return this.partitionMapReloader; + } + getPartitionDecider() { if (!this.partitionDecider) { - this.partitionDecider = new PartitionDecider(this.partitionConfig); + const reloader = this.getPartitionMapReloader(); + this.partitionDecider = new PartitionDecider(this.partitionConfig, + reloader.getPartitionMap()); + reloader.on('partitionMapChange', newMap => { + this.partitionDecider.setPartitionMap(newMap); + require('../server').getServer().handlePartitionMapChange(); + }); } return this.partitionDecider; diff --git a/src/partition/redispartitionmapreloader.js b/src/partition/redispartitionmapreloader.js new file mode 100644 index 00000000..114a94ba --- /dev/null +++ b/src/partition/redispartitionmapreloader.js @@ -0,0 +1,54 @@ +import { PartitionMap } from './partitionmap'; +import logger from 'cytube-common/lib/logger'; +import { EventEmitter } from 'events'; + +class RedisPartitionMapReloader extends EventEmitter { + constructor(redisClient, subClient) { + super(); + this.redisClient = redisClient; + this.subClient = subClient; + this.partitionMap = PartitionMap.empty(); + redisClient.once('ready', () => this.reload()); + subClient.once('ready', () => this.subscribe()); + } + + subscribe() { + this.subClient.subscribe('partitionMap'); + this.subClient.on('message', (channel, message) => { + if (channel !== 'partitionMap') { + logger.warn('RedisPartitionMapReloader received unexpected message ' + + `on redis channel ${channel}`); + return; + } + + this.reload(); + }); + } + + reload() { + this.redisClient.getAsync('partitionMap').then(result => { + var newMap = null; + try { + newMap = PartitionMap.fromJSON(JSON.parse(result)); + } catch (error) { + logger.error(`Failed to decode received partition map: ${error}`, + { payload: result }); + return; + } + + if (this.partitionMap.getHash() !== newMap.getHash()) { + logger.info(`Partition map changed (hash=${newMap.getHash()})`); + this.partitionMap = newMap; + this.emit('partitionMapChange', newMap); + } + }).catch(error => { + logger.error(`Failed to retrieve partition map from redis: ${error}`); + }); + } + + getPartitionMap() { + return this.partitionMap; + } +} + +export { RedisPartitionMapReloader }; diff --git a/src/server.js b/src/server.js index c5d629ae..e00cff90 100644 --- a/src/server.js +++ b/src/server.js @@ -333,20 +333,7 @@ Server.prototype.shutdown = function () { }); }; -Server.prototype.reloadPartitionMap = function () { - if (!Config.get("enable-partition")) { - return; - } - - var config; - try { - config = this.initModule.loadPartitionMap(); - } catch (error) { - return; - } - - this.initModule.partitionConfig.config = config.config; - +Server.prototype.handlePartitionMapChange = function () { const channels = Array.prototype.slice.call(this.channels); Promise.map(channels, channel => { if (channel.dead) { @@ -375,3 +362,11 @@ Server.prototype.reloadPartitionMap = function () { Logger.syslog.log("Partition reload complete"); }); }; + +Server.prototype.reloadPartitionMap = function () { + if (!Config.get("enable-partitions")) { + return; + } + + this.initModule.getPartitionMapReloader().reload(); +};