diff --git a/package.json b/package.json index d30f43f4..797cb1e3 100644 --- a/package.json +++ b/package.json @@ -44,6 +44,7 @@ "socket.io-redis": "^1.0.0", "source-map-support": "^0.4.0", "status-message-polyfill": "git://github.com/calzoneman/status-message-polyfill", + "toml": "^2.3.0", "uuid": "^2.0.1", "yamljs": "^0.1.6" }, diff --git a/src/partition/partitioncli.js b/src/partition/partitioncli.js new file mode 100644 index 00000000..8a97e18b --- /dev/null +++ b/src/partition/partitioncli.js @@ -0,0 +1,83 @@ +import { PartitionModule } from './partitionmodule'; +import { PartitionMap } from './partitionmap'; +import fs from 'fs'; + +const partitionModule = new PartitionModule(); +partitionModule.cliMode = true; + +function savePartitionMap(filename) { + const reloader = partitionModule.getPartitionMapReloader(); + reloader.once('partitionMapChange', map => { + var toml = 'pool = [\n'; + map.getPool().forEach((poolEntry, i) => { + toml += ` '${poolEntry}'`; + if (i < map.getPool().length - 1) { + toml += ','; + } + + toml += '\n'; + }); + toml += ']\n\n'; + + const partitions = map.getPartitions(); + Object.keys(partitions).forEach(identity => { + partitions[identity].servers.forEach(serverDef => { + toml += `[[partitions.${identity}.servers]]\n`; + toml += `url = '${serverDef.url}'\n`; + toml += `secure = ${serverDef.secure}\n`; + toml += '\n'; + }); + }); + + toml += '[overrides]\n'; + const overrides = map.getOverrides(); + Object.keys(overrides).forEach(channel => { + toml += `${channel} = '${overrides[channel]}'\n`; + }); + + fs.writeFileSync(filename, toml); + console.log(`Wrote partition map to ${filename}`); + process.exit(0); + }); +} + +function loadPartitionMap(filename) { + var newMap; + + try { + newMap = PartitionMap.fromFile(filename); + } catch (error) { + console.error(`Failed to load partition map from ${filename}: ${error}`); + console.error(error.stack); + process.exit(1); + } + + const client = partitionModule.getRedisClientProvider().get(); + const config = partitionModule.partitionConfig; + client.once('ready', () => { + client.multi() + .set(config.getPartitionMapKey(), JSON.stringify(newMap)) + .publish(config.getPublishChannel(), new Date().toISOString()) + .execAsync() + .then(result => { + console.log(`Result: ${result}`); + console.log(`Published new partition map from ${filename}`); + process.exit(0); + }).catch(error => { + console.error(`Failed to publish partition map: ${error}`); + console.error(error.stack); + process.exit(1); + }); + }); +} + +if (process.argv[2] === 'save') { + savePartitionMap(process.argv[3]); +} else if (process.argv[2] === 'load') { + loadPartitionMap(process.argv[3]); +} else { + console.error('Usage: ' + process.argv[0] + ' ' + process.argv[1] + ' '); + console.error(' "save" downloads the partition map and saves it to the specified file'); + console.error(' "load" loads the partition map from the specified file and publishes it'); + process.exit(1); +} diff --git a/src/partition/partitionconfig.js b/src/partition/partitionconfig.js index b9ec4cdd..e2e3523b 100644 --- a/src/partition/partitionconfig.js +++ b/src/partition/partitionconfig.js @@ -3,18 +3,6 @@ class PartitionConfig { this.config = config; } - getPartitionMap() { - return this.config.partitions; - } - - getOverrideMap() { - return this.config.overrides; - } - - getPool() { - return this.config.pool; - } - getIdentity() { return this.config.identity; } @@ -22,6 +10,14 @@ class PartitionConfig { getRedisConfig() { return this.config.redis; } + + getPublishChannel() { + return this.config.redis.publishChannel; + } + + getPartitionMapKey() { + return this.config.redis.partitionMapKey; + } } export { PartitionConfig }; diff --git a/src/partition/partitiondecider.js b/src/partition/partitiondecider.js index 0f6226cc..a93668f3 100644 --- a/src/partition/partitiondecider.js +++ b/src/partition/partitiondecider.js @@ -1,24 +1,26 @@ import { murmurHash1 } from '../util/murmur'; class PartitionDecider { - constructor(config) { + constructor(config, partitionMap) { this.config = config; + this.partitionMap = partitionMap; } getPartitionForChannel(channel) { - const partitionMap = this.config.getPartitionMap(); - return partitionMap[this.getPartitionIdentityForChannel(channel)]; + return this.partitionMap.getPartitions()[this.getPartitionIdentityForChannel(channel)]; } getPartitionIdentityForChannel(channel) { channel = channel.toLowerCase(); - const overrideMap = this.config.getOverrideMap(); + const overrideMap = this.partitionMap.getOverrides(); if (overrideMap.hasOwnProperty(channel)) { return overrideMap[channel]; - } else { - const pool = this.config.getPool(); + } else if (this.partitionMap.getPool().length > 0) { + const pool = this.partitionMap.getPool(); const i = murmurHash1(channel) % pool.length; return pool[i]; + } else { + return { servers: [] }; } } @@ -26,6 +28,10 @@ class PartitionDecider { return this.getPartitionIdentityForChannel(channel) === this.config.getIdentity(); } + + setPartitionMap(newMap) { + this.partitionMap = newMap; + } } export { PartitionDecider }; diff --git a/src/partition/partitionmap.js b/src/partition/partitionmap.js new file mode 100644 index 00000000..d3a96330 --- /dev/null +++ b/src/partition/partitionmap.js @@ -0,0 +1,83 @@ +import crypto from 'crypto'; +import fs from 'fs'; +import toml from 'toml'; + +function sha256(input) { + var hash = crypto.createHash('sha256'); + hash.update(input); + return hash.digest('base64'); +} + +class PartitionMap { + /** + * @param {Map} partitions Map of node ids to io configs + * @param {Array} pool List of available nodes + * @param {Map} overrides Overrides for node assignment + */ + constructor(partitions, pool, overrides) { + this.partitions = partitions; + this.pool = pool; + this.overrides = overrides || {}; + this._hash = sha256(JSON.stringify(this.partitions) + + JSON.stringify(this.pool) + + JSON.stringify(this.overrides)); + } + + getHash() { + return this._hash; + } + + getPartitions() { + return this.partitions; + } + + getPool() { + return this.pool; + } + + getOverrides() { + return this.overrides; + } + + toJSON() { + return { + partitions: this.partitions, + pool: this.pool, + overrides: this.overrides, + hash: this._hash + }; + } + + static fromJSON(json) { + if (json === null) { + throw new Error('Cannot construct PartitionMap: input is null'); + } else if (typeof json !== 'object') { + throw new Error(`Cannot construct PartitionMap from input "${json}" of type ` + + typeof json); + } else if (!json.partitions || typeof json.partitions !== 'object') { + throw new Error('Cannot construct PartitionMap: field partitions must be ' + + `an object but was "${json.partitions}"`); + } else if (!json.overrides || typeof json.overrides !== 'object') { + throw new Error('Cannot construct PartitionMap: field overrides must be ' + + `an object but was "${json.overrides}"`); + } else if (!json.pool || !Array.isArray(json.pool)) { + throw new Error('Cannot construct PartitionMap: field pool must be ' + + `an array but was "${json.pool}"`); + } + + return new PartitionMap(json.partitions, json.pool, json.overrides); + } + + static fromFile(filename) { + const rawData = fs.readFileSync(filename).toString('utf8'); + const parsed = toml.parse(rawData); + + return PartitionMap.fromJSON(parsed); + } + + static empty() { + return new PartitionMap({}, [], {}); + } +} + +export { PartitionMap }; diff --git a/src/partition/partitionmodule.js b/src/partition/partitionmodule.js index 6c3b61cb..52a74bab 100644 --- a/src/partition/partitionmodule.js +++ b/src/partition/partitionmodule.js @@ -7,6 +7,7 @@ import logger from 'cytube-common/lib/logger'; import LegacyConfig from '../config'; import path from 'path'; import { AnnouncementRefresher } from './announcementrefresher'; +import { RedisPartitionMapReloader } from './redispartitionmapreloader'; const PARTITION_CONFIG_PATH = path.resolve(__dirname, '..', '..', 'conf', 'partitions.toml'); @@ -14,6 +15,7 @@ const PARTITION_CONFIG_PATH = path.resolve(__dirname, '..', '..', 'conf', class PartitionModule { constructor() { this.initConfig(); + this.cliMode = false; } onReady() { @@ -23,13 +25,13 @@ class PartitionModule { initConfig() { logger.initialize(null, null, LegacyConfig.get('debug')); try { - this.partitionConfig = this.loadPartitionMap(); + this.partitionConfig = this.loadPartitionConfig(); } catch (error) { process.exit(1); } } - loadPartitionMap() { + loadPartitionConfig() { try { return loadFromToml(PartitionConfig, PARTITION_CONFIG_PATH); } catch (error) { @@ -44,9 +46,29 @@ class PartitionModule { } } + getPartitionMapReloader() { + if (!this.partitionMapReloader) { + const redisProvider = this.getRedisClientProvider(); + this.partitionMapReloader = new RedisPartitionMapReloader( + this.partitionConfig, + redisProvider.get(), // Client for GET partitionMap + redisProvider.get()); // Subscribe client + } + + return this.partitionMapReloader; + } + getPartitionDecider() { if (!this.partitionDecider) { - this.partitionDecider = new PartitionDecider(this.partitionConfig); + const reloader = this.getPartitionMapReloader(); + this.partitionDecider = new PartitionDecider(this.partitionConfig, + reloader.getPartitionMap()); + reloader.on('partitionMapChange', newMap => { + this.partitionDecider.setPartitionMap(newMap); + if (!this.cliMode) { + require('../server').getServer().handlePartitionMapChange(); + } + }); } return this.partitionDecider; diff --git a/src/partition/redispartitionmapreloader.js b/src/partition/redispartitionmapreloader.js new file mode 100644 index 00000000..a4eddc94 --- /dev/null +++ b/src/partition/redispartitionmapreloader.js @@ -0,0 +1,56 @@ +import { PartitionMap } from './partitionmap'; +import logger from 'cytube-common/lib/logger'; +import { EventEmitter } from 'events'; + +class RedisPartitionMapReloader extends EventEmitter { + constructor(config, redisClient, subClient) { + super(); + this.config = config; + this.redisClient = redisClient; + this.subClient = subClient; + this.partitionMap = PartitionMap.empty(); + redisClient.once('ready', () => this.reload()); + subClient.once('ready', () => this.subscribe()); + } + + subscribe() { + this.subClient.subscribe(this.config.getPublishChannel()); + this.subClient.on('message', (channel, message) => { + if (channel !== this.config.getPublishChannel()) { + logger.warn('RedisPartitionMapReloader received unexpected message ' + + `on redis channel ${channel}`); + return; + } + + logger.info(`Received partition map update message published at ${message}`); + this.reload(); + }); + } + + reload() { + this.redisClient.getAsync(this.config.getPartitionMapKey()).then(result => { + var newMap = null; + try { + newMap = PartitionMap.fromJSON(JSON.parse(result)); + } catch (error) { + logger.error(`Failed to decode received partition map: ${error}`, + { payload: result }); + return; + } + + if (this.partitionMap.getHash() !== newMap.getHash()) { + logger.info(`Partition map changed (hash=${newMap.getHash()})`); + this.partitionMap = newMap; + this.emit('partitionMapChange', newMap); + } + }).catch(error => { + logger.error(`Failed to retrieve partition map from redis: ${error}`); + }); + } + + getPartitionMap() { + return this.partitionMap; + } +} + +export { RedisPartitionMapReloader }; diff --git a/src/server.js b/src/server.js index c5d629ae..99e67059 100644 --- a/src/server.js +++ b/src/server.js @@ -333,20 +333,7 @@ Server.prototype.shutdown = function () { }); }; -Server.prototype.reloadPartitionMap = function () { - if (!Config.get("enable-partition")) { - return; - } - - var config; - try { - config = this.initModule.loadPartitionMap(); - } catch (error) { - return; - } - - this.initModule.partitionConfig.config = config.config; - +Server.prototype.handlePartitionMapChange = function () { const channels = Array.prototype.slice.call(this.channels); Promise.map(channels, channel => { if (channel.dead) { @@ -375,3 +362,11 @@ Server.prototype.reloadPartitionMap = function () { Logger.syslog.log("Partition reload complete"); }); }; + +Server.prototype.reloadPartitionMap = function () { + if (!Config.get("enable-partition")) { + return; + } + + this.initModule.getPartitionMapReloader().reload(); +};