From edb5fb6f4e2a28679adf7ef6891b4db8aade1483 Mon Sep 17 00:00:00 2001 From: calzoneman Date: Sun, 3 Jul 2016 21:28:43 -0700 Subject: [PATCH] Sync announcements across partitions --- package.json | 2 +- src/partition/announcementrefresher.js | 61 ++++++++++++++++++++++++++ src/partition/partitionmodule.js | 15 ++++++- src/server.js | 17 ++++++- 4 files changed, 91 insertions(+), 4 deletions(-) create mode 100644 src/partition/announcementrefresher.js diff --git a/package.json b/package.json index 60a8ab55..838072bd 100644 --- a/package.json +++ b/package.json @@ -2,7 +2,7 @@ "author": "Calvin Montgomery", "name": "CyTube", "description": "Online media synchronizer and chat", - "version": "3.17.4", + "version": "3.17.5", "repository": { "url": "http://github.com/calzoneman/sync" }, diff --git a/src/partition/announcementrefresher.js b/src/partition/announcementrefresher.js new file mode 100644 index 00000000..afb135f8 --- /dev/null +++ b/src/partition/announcementrefresher.js @@ -0,0 +1,61 @@ +import Logger from '../logger'; +import uuid from 'uuid'; + +var SERVER; +const SERVER_ANNOUNCEMENTS = 'serverAnnouncements'; + +class AnnouncementRefresher { + constructor(pubClient, subClient) { + this.pubClient = pubClient; + this.subClient = subClient; + this.uuid = uuid.v4(); + process.nextTick(this.init.bind(this)); + } + + init() { + SERVER = require('../server').getServer(); + SERVER.on('announcement', this.sendAnnouncement.bind(this)); + + this.subClient.once('ready', () => { + this.subClient.on('message', this.handleMessage.bind(this)); + this.subClient.subscribe(SERVER_ANNOUNCEMENTS); + }); + } + + handleMessage(channel, message) { + if (channel !== SERVER_ANNOUNCEMENTS) { + return; + } + + var data; + try { + data = JSON.parse(message); + } catch (error) { + Logger.errlog.log('Unable to unmarshal server announcement: ' + error.stack + + '\nMessage was: ' + message); + return; + } + + if (data.partitionID === this.uuid) { + return; + } + + SERVER.setAnnouncement({ + title: data.title, + text: data.text, + from: data.from + }); + } + + sendAnnouncement(data) { + const message = JSON.stringify({ + title: data.title, + text: data.text, + from: data.from, + partitionID: this.uuid + }); + this.pubClient.publish(SERVER_ANNOUNCEMENTS, message); + } +} + +export { AnnouncementRefresher }; diff --git a/src/partition/partitionmodule.js b/src/partition/partitionmodule.js index 5a5188eb..6c3b61cb 100644 --- a/src/partition/partitionmodule.js +++ b/src/partition/partitionmodule.js @@ -6,6 +6,7 @@ import RedisClientProvider from 'cytube-common/lib/redis/redisclientprovider'; import logger from 'cytube-common/lib/logger'; import LegacyConfig from '../config'; import path from 'path'; +import { AnnouncementRefresher } from './announcementrefresher'; const PARTITION_CONFIG_PATH = path.resolve(__dirname, '..', '..', 'conf', 'partitions.toml'); @@ -16,7 +17,7 @@ class PartitionModule { } onReady() { - + this.getAnnouncementRefresher(); } initConfig() { @@ -69,6 +70,18 @@ class PartitionModule { return this.redisClientProvider; } + + getAnnouncementRefresher() { + if (!this.announcementRefresher) { + const provider = this.getRedisClientProvider(); + this.announcementRefresher = new AnnouncementRefresher( + provider.get(), + provider.get() + ); + } + + return this.announcementRefresher; + } } export { PartitionModule }; diff --git a/src/server.js b/src/server.js index 0e64e961..593e21b4 100644 --- a/src/server.js +++ b/src/server.js @@ -3,6 +3,7 @@ var singleton = null; var Config = require("./config"); var Promise = require("bluebird"); import * as ChannelStore from './channel-storage/channelstore'; +import { EventEmitter } from 'events'; module.exports = { init: function () { @@ -163,6 +164,8 @@ var Server = function () { initModule.onReady(); }; +Server.prototype = Object.create(EventEmitter.prototype); + Server.prototype.getHTTPIP = function (req) { var ip = req.ip; if (ip === "127.0.0.1" || ip === "::1") { @@ -285,12 +288,22 @@ Server.prototype.packChannelList = function (publicOnly, isAdmin) { }; Server.prototype.announce = function (data) { + this.setAnnouncement(data); + if (data == null) { - this.announcement = null; db.clearAnnouncement(); } else { - this.announcement = data; db.setAnnouncement(data); + } + + this.emit("announcement", data); +}; + +Server.prototype.setAnnouncement = function (data) { + if (data == null) { + this.announcement = null; + } else { + this.announcement = data; sio.instance.emit("announcement", data); } };