Upgrade to socket.io v4

This commit is contained in:
Calvin Montgomery 2021-08-14 20:28:24 -07:00
parent 1b7e7c74f5
commit 7214b7c474
7 changed files with 148 additions and 285 deletions

View file

@ -60,7 +60,10 @@ var defaults = {
io: {
domain: "http://localhost",
"default-port": 1337,
"ip-connection-limit": 10
"ip-connection-limit": 10,
cors: {
"allowed-origins": []
}
},
"youtube-v3-key": "",
"channel-blacklist": [],

View file

@ -14,7 +14,6 @@ const getAliases = Promise.promisify(db.getAliases);
import { CachingGlobalBanlist } from './globalban';
import proxyaddr from 'proxy-addr';
import { Counter, Gauge } from 'prom-client';
import Socket from 'socket.io/lib/socket';
import { TokenBucket } from '../util/token-bucket';
import http from 'http';
@ -108,28 +107,6 @@ class IOServer {
next();
}
/*
TODO: see https://github.com/calzoneman/sync/issues/724
ipConnectionLimitMiddleware(socket, next) {
const ip = socket.context.ipAddress;
const count = this.ipCount.get(ip) || 0;
if (count >= Config.get('io.ip-connection-limit')) {
// TODO: better error message would be nice
next(new Error('Too many connections from your IP address'));
return;
}
this.ipCount.set(ip, count + 1);
console.log(ip, this.ipCount.get(ip));
socket.once('disconnect', () => {
console.log('Disconnect event has fired for', socket.id);
this.ipCount.set(ip, this.ipCount.get(ip) - 1);
});
next();
}
*/
checkIPLimit(socket) {
const ip = socket.context.ipAddress;
const count = this.ipCount.get(ip) || 0;
@ -219,9 +196,12 @@ class IOServer {
handleConnection(socket) {
if (!this.checkIPLimit(socket)) {
return;
//return;
}
patchTypecheckedFunctions(socket);
patchSocketMetrics(socket);
this.setRateLimiter(socket);
emitMetrics(socket);
@ -268,14 +248,10 @@ class IOServer {
}
initSocketIO() {
patchSocketMetrics();
patchTypecheckedFunctions();
const io = this.io = sio.instance = sio();
io.use(this.ipProxyMiddleware.bind(this));
io.use(this.ipBanMiddleware.bind(this));
io.use(this.ipThrottleMiddleware.bind(this));
//io.use(this.ipConnectionLimitMiddleware.bind(this));
io.use(this.cookieParsingMiddleware.bind(this));
io.use(this.ipSessionCookieMiddleware.bind(this));
io.use(this.authUserMiddleware.bind(this));
@ -290,7 +266,7 @@ class IOServer {
const engineOpts = {
/*
* Set ping timeout to 2 minutes to avoid spurious reconnects
* during transient network issues. The default of 5 minutes
* during transient network issues. The default of 20 seconds
* is too aggressive.
*
* https://github.com/calzoneman/sync/issues/780
@ -309,11 +285,17 @@ class IOServer {
perMessageDeflate: false,
httpCompression: false,
maxHttpBufferSize: 1 << 20,
/*
* Default is 10MB.
* Even 1MiB seems like a generous limit...
* Enable legacy support for socket.io v2 clients (e.g., bots)
*/
maxHttpBufferSize: 1 << 20
allowEIO3: true,
cors: {
origin: getCorsAllowCallback(),
credentials: true // enable cookies for auth
}
};
servers.forEach(server => {
@ -330,26 +312,25 @@ const outgoingPacketCount = new Counter({
name: 'cytube_socketio_outgoing_packets_total',
help: 'Number of outgoing socket.io packets to clients'
});
function patchSocketMetrics() {
const onevent = Socket.prototype.onevent;
const packet = Socket.prototype.packet;
function patchSocketMetrics(sock) {
const emit = require('events').EventEmitter.prototype.emit;
Socket.prototype.onevent = function patchedOnevent() {
onevent.apply(this, arguments);
sock.onAny(() => {
incomingEventCount.inc(1);
emit.call(this, 'cytube:count-event');
};
emit.call(sock, 'cytube:count-event');
});
Socket.prototype.packet = function patchedPacket() {
let packet = sock.packet;
sock.packet = function patchedPacket() {
packet.apply(this, arguments);
outgoingPacketCount.inc(1);
};
}.bind(sock);
}
/* TODO: remove this crap */
function patchTypecheckedFunctions() {
Socket.prototype.typecheckedOn = function typecheckedOn(msg, template, cb) {
/* Addendum 2021-08-14: socket.io v4 supports middleware, maybe move type validation to that */
function patchTypecheckedFunctions(sock) {
sock.typecheckedOn = function typecheckedOn(msg, template, cb) {
this.on(msg, (data, ack) => {
typecheck(data, template, (err, data) => {
if (err) {
@ -361,9 +342,9 @@ function patchTypecheckedFunctions() {
}
});
});
};
}.bind(sock);
Socket.prototype.typecheckedOnce = function typecheckedOnce(msg, template, cb) {
sock.typecheckedOnce = function typecheckedOnce(msg, template, cb) {
this.once(msg, data => {
typecheck(data, template, (err, data) => {
if (err) {
@ -375,7 +356,7 @@ function patchTypecheckedFunctions() {
}
});
});
};
}.bind(sock);
}
let globalIPBanlist = null;
@ -409,16 +390,17 @@ const promSocketReconnect = new Counter({
function emitMetrics(sock) {
try {
let closed = false;
let transportName = sock.client.conn.transport.name;
let transportName = sock.conn.transport.name;
promSocketCount.inc({ transport: transportName });
promSocketAccept.inc(1);
sock.client.conn.on('upgrade', newTransport => {
sock.conn.on('upgrade', () => {
try {
let newTransport = sock.conn.transport.name;
// Sanity check
if (!closed && newTransport.name !== transportName) {
if (!closed && newTransport !== transportName) {
promSocketCount.dec({ transport: transportName });
transportName = newTransport.name;
transportName = newTransport;
promSocketCount.inc({ transport: transportName });
}
} catch (error) {
@ -526,3 +508,30 @@ setInterval(function () {
LOGGER.info('Cleaned up %d stale IP throttle token buckets', cleaned);
}
}, 5 * 60 * 1000);
function getCorsAllowCallback() {
let origins = Array.prototype.slice.call(Config.get('io.cors.allowed-origins'));
origins = origins.concat([
Config.get('io.domain'),
Config.get('https.domain')
]);
return function corsOriginAllowed(origin, callback) {
if (!origin) {
// Non-browser clients might not care about Origin, allow these.
callback(null, true);
return;
}
// Different ports are technically cross-origin; a distinction that does not matter to CyTube.
origin = origin.replace(/:\d+$/, '');
if (origins.includes(origin)) {
callback(null, true);
} else {
LOGGER.warn('Rejecting origin "%s"; allowed origins are %j', origin, origins);
callback(new Error('Invalid origin'));
}
};
}