375 lines
12 KiB
JavaScript
375 lines
12 KiB
JavaScript
import sio from 'socket.io';
|
|
import db from '../database';
|
|
import User from '../user';
|
|
import Server from '../server';
|
|
import Config from '../config';
|
|
const cookieParser = require("cookie-parser")(Config.get("http.cookie-secret"));
|
|
import typecheck from 'json-typecheck';
|
|
import { isTorExit } from '../tor';
|
|
import session from '../session';
|
|
import counters from '../counters';
|
|
import { verifyIPSessionCookie } from '../web/middleware/ipsessioncookie';
|
|
import Promise from 'bluebird';
|
|
const verifySession = Promise.promisify(session.verifySession);
|
|
const getAliases = Promise.promisify(db.getAliases);
|
|
import { CachingGlobalBanlist } from './globalban';
|
|
import proxyaddr from 'proxy-addr';
|
|
import { Counter, Gauge } from 'prom-client';
|
|
import Socket from 'socket.io/lib/socket';
|
|
import { TokenBucket } from '../util/token-bucket';
|
|
import http from 'http';
|
|
|
|
const LOGGER = require('@calzoneman/jsli')('ioserver');
|
|
|
|
// WIP, not in use yet
|
|
class IOServer {
|
|
constructor(options = {
|
|
proxyTrustFn: proxyaddr.compile('127.0.0.1')
|
|
}) {
|
|
({
|
|
proxyTrustFn: this.proxyTrustFn
|
|
} = options);
|
|
|
|
this.ipThrottle = new Map();
|
|
this.ipCount = new Map();
|
|
}
|
|
|
|
// Map proxied sockets to the real IP address via X-Forwarded-For
|
|
// If the resulting address is a known Tor exit, flag it as such
|
|
ipProxyMiddleware(socket, next) {
|
|
if (!socket.context) socket.context = {};
|
|
socket.context.ipAddress = proxyaddr(socket.client.request, this.proxyTrustFn);
|
|
if (isTorExit(socket.context.ipAddress)) {
|
|
socket.context.torConnection = true;
|
|
}
|
|
next();
|
|
}
|
|
|
|
// Reject global banned IP addresses
|
|
ipBanMiddleware(socket, next) {
|
|
if (isIPGlobalBanned(socket.context.ipAddress)) {
|
|
LOGGER.info('Rejecting %s - banned',
|
|
socket.context.ipAddress);
|
|
next(new Error('You are banned from the server'));
|
|
return;
|
|
}
|
|
|
|
next();
|
|
}
|
|
|
|
// Rate limit connection attempts by IP address
|
|
ipThrottleMiddleware(socket, next) {
|
|
if (!this.ipThrottle.has(socket.context.ipAddress)) {
|
|
this.ipThrottle.set(socket.context.ipAddress, new TokenBucket(5, 0.1));
|
|
}
|
|
|
|
const bucket = this.ipThrottle.get(socket.context.ipAddress);
|
|
if (bucket.throttle()) {
|
|
LOGGER.info('Rejecting %s - exceeded connection rate limit',
|
|
socket.context.ipAddress);
|
|
next(new Error('Rate limit exceeded'));
|
|
return;
|
|
}
|
|
|
|
next();
|
|
}
|
|
|
|
ipConnectionLimitMiddleware(socket, next) {
|
|
const ip = socket.context.ipAddress;
|
|
const count = this.ipCount.get(ip) || 0;
|
|
if (count >= Config.get('io.ip-connection-limit')) {
|
|
// TODO: better error message would be nice
|
|
next(new Error('Too many connections from your IP address'));
|
|
return;
|
|
}
|
|
|
|
this.ipCount.set(ip, count + 1);
|
|
socket.once('disconnect', () => {
|
|
this.ipCount.set(ip, this.ipCount.get(ip) - 1);
|
|
});
|
|
|
|
next();
|
|
}
|
|
|
|
// Parse cookies
|
|
cookieParsingMiddleware(socket, next) {
|
|
const req = socket.request;
|
|
if (req.headers.cookie) {
|
|
cookieParser(req, null, () => next());
|
|
} else {
|
|
req.cookies = {};
|
|
req.signedCookies = {};
|
|
next();
|
|
}
|
|
}
|
|
|
|
// Determine session age from ip-session cookie
|
|
// (Used for restricting chat)
|
|
ipSessionCookieMiddleware(socket, next) {
|
|
const cookie = socket.request.signedCookies['ip-session'];
|
|
if (!cookie) {
|
|
socket.context.ipSessionFirstSeen = new Date();
|
|
next();
|
|
return;
|
|
}
|
|
|
|
const sessionMatch = verifyIPSessionCookie(socket.context.ipAddress, cookie);
|
|
if (sessionMatch) {
|
|
socket.context.ipSessionFirstSeen = sessionMatch.date;
|
|
} else {
|
|
socket.context.ipSessionFirstSeen = new Date();
|
|
}
|
|
next();
|
|
}
|
|
|
|
// Match login cookie against the DB, look up aliases
|
|
authUserMiddleware(socket, next) {
|
|
socket.context.aliases = [];
|
|
|
|
const promises = [];
|
|
const auth = socket.request.signedCookies.auth;
|
|
if (auth) {
|
|
promises.push(verifySession(auth).then(user => {
|
|
socket.context.user = Object.assign({}, user);
|
|
}).catch(error => {
|
|
LOGGER.warn('Unable to verify session for %s - ignoring auth',
|
|
socket.context.ipAddress);
|
|
}));
|
|
}
|
|
|
|
promises.push(getAliases(socket.context.ipAddress).then(aliases => {
|
|
socket.context.aliases = aliases;
|
|
}).catch(error => {
|
|
LOGGER.warn('Unable to load aliases for %s',
|
|
socket.context.ipAddress);
|
|
}));
|
|
|
|
Promise.all(promises).then(() => next());
|
|
}
|
|
|
|
metricsEmittingMiddleware(socket, next) {
|
|
emitMetrics(socket);
|
|
next();
|
|
}
|
|
|
|
handleConnection(socket) {
|
|
LOGGER.info('Accepted socket from %s', socket.context.ipAddress);
|
|
counters.add('socket.io:accept', 1);
|
|
socket.once('disconnect', () => counters.add('socket.io:disconnect', 1));
|
|
|
|
const user = new User(socket, socket.context.ipAddress, socket.context.user);
|
|
if (socket.context.user) {
|
|
db.recordVisit(socket.context.ipAddress, user.getName());
|
|
}
|
|
|
|
const announcement = Server.getServer().announcement;
|
|
if (announcement !== null) {
|
|
socket.emit('announcement', announcement);
|
|
}
|
|
}
|
|
|
|
initSocketIO() {
|
|
patchSocketMetrics();
|
|
patchTypecheckedFunctions();
|
|
|
|
const io = this.io = sio.instance = sio();
|
|
io.use(this.ipProxyMiddleware.bind(this));
|
|
io.use(this.ipBanMiddleware.bind(this));
|
|
io.use(this.ipThrottleMiddleware.bind(this));
|
|
io.use(this.ipConnectionLimitMiddleware.bind(this));
|
|
io.use(this.cookieParsingMiddleware.bind(this));
|
|
io.use(this.ipSessionCookieMiddleware.bind(this));
|
|
io.use(this.authUserMiddleware.bind(this));
|
|
io.use(this.metricsEmittingMiddleware.bind(this));
|
|
io.on('connection', this.handleConnection.bind(this));
|
|
}
|
|
|
|
bindTo(servers) {
|
|
if (!this.io) {
|
|
throw new Error('Cannot bind: socket.io has not been initialized yet');
|
|
}
|
|
|
|
servers.forEach(server => {
|
|
this.io.attach(server);
|
|
});
|
|
}
|
|
}
|
|
|
|
const incomingEventCount = new Counter({
|
|
name: 'cytube_socketio_incoming_events_total',
|
|
help: 'Number of received socket.io events from clients'
|
|
});
|
|
const outgoingPacketCount = new Counter({
|
|
name: 'cytube_socketio_outgoing_packets_total',
|
|
help: 'Number of outgoing socket.io packets to clients'
|
|
});
|
|
function patchSocketMetrics() {
|
|
const onevent = Socket.prototype.onevent;
|
|
const packet = Socket.prototype.packet;
|
|
|
|
Socket.prototype.onevent = function patchedOnevent() {
|
|
onevent.apply(this, arguments);
|
|
incomingEventCount.inc(1, new Date());
|
|
};
|
|
|
|
Socket.prototype.packet = function patchedPacket() {
|
|
packet.apply(this, arguments);
|
|
outgoingPacketCount.inc(1, new Date());
|
|
};
|
|
}
|
|
|
|
/* TODO: remove this crap */
|
|
function patchTypecheckedFunctions() {
|
|
Socket.prototype.typecheckedOn = function typecheckedOn(msg, template, cb) {
|
|
this.on(msg, (data, ack) => {
|
|
typecheck(data, template, (err, data) => {
|
|
if (err) {
|
|
this.emit("errorMsg", {
|
|
msg: "Unexpected error for message " + msg + ": " + err.message
|
|
});
|
|
} else {
|
|
cb(data, ack);
|
|
}
|
|
});
|
|
});
|
|
};
|
|
|
|
Socket.prototype.typecheckedOnce = function typecheckedOnce(msg, template, cb) {
|
|
this.once(msg, data => {
|
|
typecheck(data, template, (err, data) => {
|
|
if (err) {
|
|
this.emit("errorMsg", {
|
|
msg: "Unexpected error for message " + msg + ": " + err.message
|
|
});
|
|
} else {
|
|
cb(data);
|
|
}
|
|
});
|
|
});
|
|
};
|
|
}
|
|
|
|
let globalIPBanlist = null;
|
|
function isIPGlobalBanned(ip) {
|
|
if (globalIPBanlist === null) {
|
|
globalIPBanlist = new CachingGlobalBanlist(db.getGlobalBanDB());
|
|
globalIPBanlist.refreshCache();
|
|
globalIPBanlist.startCacheTimer(60 * 1000);
|
|
}
|
|
|
|
return globalIPBanlist.isIPGlobalBanned(ip);
|
|
}
|
|
|
|
const promSocketCount = new Gauge({
|
|
name: 'cytube_sockets_num_connected',
|
|
help: 'Gauge of connected socket.io clients',
|
|
labelNames: ['transport']
|
|
});
|
|
const promSocketAccept = new Counter({
|
|
name: 'cytube_sockets_accepts_total',
|
|
help: 'Counter for number of connections accepted. Excludes rejected connections.'
|
|
});
|
|
const promSocketDisconnect = new Counter({
|
|
name: 'cytube_sockets_disconnects_total',
|
|
help: 'Counter for number of connections disconnected.'
|
|
});
|
|
function emitMetrics(sock) {
|
|
try {
|
|
let closed = false;
|
|
let transportName = sock.client.conn.transport.name;
|
|
promSocketCount.inc({ transport: transportName });
|
|
promSocketAccept.inc(1, new Date());
|
|
|
|
sock.client.conn.on('upgrade', newTransport => {
|
|
try {
|
|
// Sanity check
|
|
if (!closed && newTransport.name !== transportName) {
|
|
promSocketCount.dec({ transport: transportName });
|
|
transportName = newTransport.name;
|
|
promSocketCount.inc({ transport: transportName });
|
|
}
|
|
} catch (error) {
|
|
LOGGER.error('Error emitting transport upgrade metrics for socket (ip=%s): %s',
|
|
sock.context.ipAddress, error.stack);
|
|
}
|
|
});
|
|
|
|
sock.once('disconnect', () => {
|
|
try {
|
|
closed = true;
|
|
promSocketCount.dec({ transport: transportName });
|
|
promSocketDisconnect.inc(1, new Date());
|
|
} catch (error) {
|
|
LOGGER.error('Error emitting disconnect metrics for socket (ip=%s): %s',
|
|
sock.context.ipAddress, error.stack);
|
|
}
|
|
});
|
|
} catch (error) {
|
|
LOGGER.error('Error emitting metrics for socket (ip=%s): %s',
|
|
sock.context.ipAddress, error.stack);
|
|
}
|
|
}
|
|
|
|
let instance = null;
|
|
|
|
module.exports = {
|
|
init: function (srv, webConfig) {
|
|
if (instance !== null) {
|
|
throw new Error('ioserver.init: already initialized');
|
|
}
|
|
|
|
const ioServer = instance = new IOServer({
|
|
proxyTrustFn: proxyaddr.compile(webConfig.getTrustedProxies())
|
|
});
|
|
|
|
ioServer.initSocketIO();
|
|
|
|
const uniqueListenAddresses = new Set();
|
|
const servers = [];
|
|
|
|
Config.get("listen").forEach(function (bind) {
|
|
if (!bind.io) {
|
|
return;
|
|
}
|
|
|
|
const id = bind.ip + ":" + bind.port;
|
|
if (uniqueListenAddresses.has(id)) {
|
|
LOGGER.warn("Ignoring duplicate listen address %s", id);
|
|
return;
|
|
}
|
|
|
|
if (srv.servers.hasOwnProperty(id)) {
|
|
servers.push(srv.servers[id]);
|
|
} else {
|
|
const server = http.createServer().listen(bind.port, bind.ip);
|
|
servers.push(server);
|
|
}
|
|
|
|
uniqueListenAddresses.add(id);
|
|
});
|
|
|
|
ioServer.bindTo(servers);
|
|
},
|
|
|
|
IOServer: IOServer
|
|
};
|
|
|
|
/* Clean out old rate limiters */
|
|
setInterval(function () {
|
|
if (instance == null) return;
|
|
|
|
let cleaned = 0;
|
|
const keys = instance.ipThrottle.keys();
|
|
for (const key of keys) {
|
|
if (instance.ipThrottle.get(key).lastRefill < Date.now() - 60000) {
|
|
const bucket = instance.ipThrottle.delete(key);
|
|
for (const k in bucket) delete bucket[k];
|
|
cleaned++;
|
|
}
|
|
}
|
|
|
|
if (cleaned > 0) {
|
|
LOGGER.info('Cleaned up %d stale IP throttle token buckets', cleaned);
|
|
}
|
|
}, 5 * 60 * 1000);
|