diff --git a/src/partition/partitioncli.js b/src/partition/partitioncli.js index d7d2e0ca..8a97e18b 100644 --- a/src/partition/partitioncli.js +++ b/src/partition/partitioncli.js @@ -53,10 +53,11 @@ function loadPartitionMap(filename) { } const client = partitionModule.getRedisClientProvider().get(); + const config = partitionModule.partitionConfig; client.once('ready', () => { client.multi() - .set('partitionMap', JSON.stringify(newMap)) - .publish('partitionMap', new Date().toISOString()) + .set(config.getPartitionMapKey(), JSON.stringify(newMap)) + .publish(config.getPublishChannel(), new Date().toISOString()) .execAsync() .then(result => { console.log(`Result: ${result}`); diff --git a/src/partition/partitionconfig.js b/src/partition/partitionconfig.js index b3ccd46d..e2e3523b 100644 --- a/src/partition/partitionconfig.js +++ b/src/partition/partitionconfig.js @@ -10,6 +10,14 @@ class PartitionConfig { getRedisConfig() { return this.config.redis; } + + getPublishChannel() { + return this.config.redis.publishChannel; + } + + getPartitionMapKey() { + return this.config.redis.partitionMapKey; + } } export { PartitionConfig }; diff --git a/src/partition/partitionmap.js b/src/partition/partitionmap.js index 2d7a3c96..d3a96330 100644 --- a/src/partition/partitionmap.js +++ b/src/partition/partitionmap.js @@ -49,6 +49,22 @@ class PartitionMap { } static fromJSON(json) { + if (json === null) { + throw new Error('Cannot construct PartitionMap: input is null'); + } else if (typeof json !== 'object') { + throw new Error(`Cannot construct PartitionMap from input "${json}" of type ` + + typeof json); + } else if (!json.partitions || typeof json.partitions !== 'object') { + throw new Error('Cannot construct PartitionMap: field partitions must be ' + + `an object but was "${json.partitions}"`); + } else if (!json.overrides || typeof json.overrides !== 'object') { + throw new Error('Cannot construct PartitionMap: field overrides must be ' + + `an object but was "${json.overrides}"`); + } else if (!json.pool || !Array.isArray(json.pool)) { + throw new Error('Cannot construct PartitionMap: field pool must be ' + + `an array but was "${json.pool}"`); + } + return new PartitionMap(json.partitions, json.pool, json.overrides); } diff --git a/src/partition/partitionmodule.js b/src/partition/partitionmodule.js index d698b012..52a74bab 100644 --- a/src/partition/partitionmodule.js +++ b/src/partition/partitionmodule.js @@ -50,6 +50,7 @@ class PartitionModule { if (!this.partitionMapReloader) { const redisProvider = this.getRedisClientProvider(); this.partitionMapReloader = new RedisPartitionMapReloader( + this.partitionConfig, redisProvider.get(), // Client for GET partitionMap redisProvider.get()); // Subscribe client } diff --git a/src/partition/redispartitionmapreloader.js b/src/partition/redispartitionmapreloader.js index 114a94ba..a4eddc94 100644 --- a/src/partition/redispartitionmapreloader.js +++ b/src/partition/redispartitionmapreloader.js @@ -3,8 +3,9 @@ import logger from 'cytube-common/lib/logger'; import { EventEmitter } from 'events'; class RedisPartitionMapReloader extends EventEmitter { - constructor(redisClient, subClient) { + constructor(config, redisClient, subClient) { super(); + this.config = config; this.redisClient = redisClient; this.subClient = subClient; this.partitionMap = PartitionMap.empty(); @@ -13,20 +14,21 @@ class RedisPartitionMapReloader extends EventEmitter { } subscribe() { - this.subClient.subscribe('partitionMap'); + this.subClient.subscribe(this.config.getPublishChannel()); this.subClient.on('message', (channel, message) => { - if (channel !== 'partitionMap') { + if (channel !== this.config.getPublishChannel()) { logger.warn('RedisPartitionMapReloader received unexpected message ' + `on redis channel ${channel}`); return; } + logger.info(`Received partition map update message published at ${message}`); this.reload(); }); } reload() { - this.redisClient.getAsync('partitionMap').then(result => { + this.redisClient.getAsync(this.config.getPartitionMapKey()).then(result => { var newMap = null; try { newMap = PartitionMap.fromJSON(JSON.parse(result));