diff --git a/package.json b/package.json index 27969620..be9bf2ef 100644 --- a/package.json +++ b/package.json @@ -34,6 +34,7 @@ "sanitize-html": "git://github.com/calzoneman/sanitize-html", "serve-static": "^1.10.0", "socket.io": "^1.3.7", + "socket.io-redis": "^1.0.0", "source-map-support": "^0.3.2", "status-message-polyfill": "calzoneman/status-message-polyfill", "yamljs": "^0.1.6" diff --git a/src/io/backend/frontendmanager.js b/src/io/backend/frontendmanager.js index 5bd465b3..1dcd83c0 100644 --- a/src/io/backend/frontendmanager.js +++ b/src/io/backend/frontendmanager.js @@ -1,20 +1,62 @@ +import ioServer from '../ioserver'; +import ProxiedSocket from './proxiedsocket'; + export default class FrontendManager { - constructor() { + constructor(socketEmitter) { + this.socketEmitter = socketEmitter; this.frontendConnections = {}; + this.frontendProxiedSockets = {}; } onConnection(socket) { - if (this.frontendConnections.hasOwnProperty(socket.remoteAddress)) { + if (this.frontendConnections.hasOwnProperty(socket.remoteAddressAndPort)) { // TODO: do some validation, maybe check if the socket is still connected? throw new Error(); } this.frontendConnections[socket.remoteAddressAndPort] = socket; - console.log(socket.remoteAddressAndPort); socket.on('data', this.onData.bind(this, socket)); } onData(socket, data) { - console.log(data); + switch (data.$type) { + case 'socketConnect': + this.onSocketConnect(socket, data); + break; + case 'socketFrame': + this.onSocketFrame(socket, data); + break; + } + } + + onSocketConnect(frontendConnection, data) { + const mapKey = frontendConnection.remoteAddressAndPort; + const proxiedSocket = new ProxiedSocket( + data.socketID, + data.socketData, + this.socketEmitter, + frontendConnection); + + if (!this.frontendProxiedSockets.hasOwnProperty(mapKey)) { + this.frontendProxiedSockets[mapKey] = {}; + } else if (this.frontendProxiedSockets[mapKey].hasOwnProperty(data.socketID)) { + // TODO: Handle this gracefully + throw new Error(); + } + + this.frontendProxiedSockets[mapKey][data.socketID] = proxiedSocket; + ioServer.handleConnection(proxiedSocket); + } + + onSocketFrame(frontendConnection, data) { + const mapKey = frontendConnection.remoteAddressAndPort; + const socketMap = this.frontendProxiedSockets[mapKey]; + if (!socketMap || !socketMap.hasOwnProperty(data.socketID)) { + // TODO + throw new Error(); + } + + const socket = socketMap[data.socketID]; + socket.onProxiedEventReceived.apply(socket, [data.event].concat(data.args)); } } diff --git a/src/io/backend/iobackend.js b/src/io/backend/iobackend.js index e0c8d78d..9d0de5c2 100644 --- a/src/io/backend/iobackend.js +++ b/src/io/backend/iobackend.js @@ -2,14 +2,15 @@ import Server from 'cytube-common/lib/tcpjson/server'; import FrontendManager from './frontendmanager'; export default class IOBackend { - constructor(proxyListenerConfig) { + constructor(proxyListenerConfig, socketEmitter) { this.proxyListenerConfig = proxyListenerConfig; + this.socketEmitter = socketEmitter; this.initFrontendManager(); this.initProxyListener(); } initFrontendManager() { - this.frontendManager = new FrontendManager(); + this.frontendManager = new FrontendManager(this.socketEmitter); } initProxyListener() { diff --git a/src/io/backend/proxiedsocket.js b/src/io/backend/proxiedsocket.js index 38e004b6..64db50a2 100644 --- a/src/io/backend/proxiedsocket.js +++ b/src/io/backend/proxiedsocket.js @@ -11,7 +11,7 @@ export default class ProxiedSocket extends EventEmitter { } emit() { - const target = socketEmitter.to(this.id); + const target = this.socketEmitter.to(this.id); target.emit.apply(target, arguments); } diff --git a/src/io/ioserver.js b/src/io/ioserver.js index 0f625783..6def95f8 100644 --- a/src/io/ioserver.js +++ b/src/io/ioserver.js @@ -273,7 +273,9 @@ module.exports = { bound[id] = null; }); - } + }, + + handleConnection: handleConnection }; /* Clean out old rate limiters */ diff --git a/src/server.js b/src/server.js index fd79c9f1..20c8c7e9 100644 --- a/src/server.js +++ b/src/server.js @@ -127,6 +127,20 @@ var Server = function () { }); require("./io/ioserver").init(self, webConfig); + const redisAdapter = require('socket.io-redis'); + const IOBackend = require('./io/backend/iobackend'); + const sioEmitter = require("socket.io").instance; + sioEmitter.adapter(redisAdapter()); + const listenerConfig = { + getPort: function () { + return 3071; + }, + + getHost: function () { + return '127.0.0.1'; + } + }; + const backend = new IOBackend(listenerConfig, sioEmitter); // background tasks init ---------------------------------------------- require("./bgtask")(self);