diff --git a/NEWS.md b/NEWS.md index f30f0e0a..45446c1b 100644 --- a/NEWS.md +++ b/NEWS.md @@ -1,3 +1,22 @@ +2015-10-04 +========== + + * The channel data storage system has been refactored a bit. For + compatibility, the default remains to store JSON objects for each channel in + the `chandump` folder, however there is now also the option of storing + channel data in the database. You can take advantage of this by setting + `channel-storage: type: 'database'` in your `config.yaml`. + - In order to migrate existing channel data from the `chandump` files to the + database, run `node lib/channel-storage/migrate.js`. + * The database storage method uses foreign keys to associate the channel data + with the corresponding row in the `channels` table. This requires that the + tables be stored using the InnoDB engine rather than MyISAM. If your CyTube + tables defaulted to MyISAM, you can fix them by running + + ```sql + ALTER TABLE `channels` ENGINE = InnoDB; + ``` + 2015-09-21 ========== diff --git a/config.template.yaml b/config.template.yaml index 78782b1c..f50e2a20 100644 --- a/config.template.yaml +++ b/config.template.yaml @@ -219,3 +219,13 @@ setuid: user: 'user' # how long to wait in ms before changing uid/gid timeout: 15 + +# Determines channel data storage mechanism. +# Defaults to 'file', in which channel data is JSON stringified and saved to a file +# in the `chandump/` folder. This is the legacy behavior of CyTube. +# The other possible option is 'database', in which case each key-value pair of +# channel data is stored as a row in the `channel_data` database table. +# To migrate legacy chandump files to the database, shut down CyTube (to prevent +# concurrent updates), then run `node lib/channel-storage/migrate.js`. +channel-storage: + type: 'file' diff --git a/package.json b/package.json index 175451e3..a4a22f07 100644 --- a/package.json +++ b/package.json @@ -2,7 +2,7 @@ "author": "Calvin Montgomery", "name": "CyTube", "description": "Online media synchronizer and chat", - "version": "3.10.0", + "version": "3.11.0", "repository": { "url": "http://github.com/calzoneman/sync" }, @@ -10,10 +10,12 @@ "dependencies": { "babel": "^5.8.23", "bcrypt": "^0.8.5", + "bluebird": "^2.10.1", "body-parser": "^1.14.0", "cheerio": "^0.19.0", "compression": "^1.5.2", "cookie-parser": "^1.4.0", + "create-error": "^0.3.1", "csrf": "^3.0.0", "cytube-mediaquery": "git://github.com/CyTube/mediaquery", "cytubefilters": "git://github.com/calzoneman/cytubefilters#095b7956", @@ -37,9 +39,9 @@ }, "scripts": { "build-player": "$npm_node_execpath build-player.js", - "build-server": "babel --source-maps --out-dir lib/ src/", + "build-server": "babel --source-maps --loose es6.destructuring,es6.forOf --out-dir lib/ src/", "postinstall": "./postinstall.sh", - "server-dev": "babel --watch --source-maps --out-dir lib/ src/" + "server-dev": "babel --watch --source-maps --loose es6.destructuring,es6.forOf --out-dir lib/ src/" }, "devDependencies": { "coffee-script": "^1.9.2" diff --git a/src/bgtask.js b/src/bgtask.js index 02aca717..a966f60d 100644 --- a/src/bgtask.js +++ b/src/bgtask.js @@ -64,7 +64,9 @@ function initChannelDumper(Server) { for (var i = 0; i < Server.channels.length; i++) { var chan = Server.channels[i]; if (!chan.dead && chan.users && chan.users.length > 0) { - chan.saveState(); + chan.saveState().catch(err => { + Logger.errlog.log(`Failed to save /r/${chan.name}: ${err.stack}`); + }); } } }, CHANNEL_SAVE_INTERVAL); diff --git a/src/channel-storage/channelstore.js b/src/channel-storage/channelstore.js new file mode 100644 index 00000000..278dab46 --- /dev/null +++ b/src/channel-storage/channelstore.js @@ -0,0 +1,36 @@ +import { FileStore } from './filestore'; +import { DatabaseStore } from './dbstore'; +import Config from '../config'; +import Promise from 'bluebird'; + +var CHANNEL_STORE = null; + +export function init() { + CHANNEL_STORE = loadChannelStore(); +} + +export function load(channelName) { + if (CHANNEL_STORE === null) { + return Promise.reject(new Error('ChannelStore not initialized yet')); + } + + return CHANNEL_STORE.load(channelName); +} + +export function save(channelName, data) { + if (CHANNEL_STORE === null) { + return Promise.reject(new Error('ChannelStore not initialized yet')); + } + + return CHANNEL_STORE.save(channelName, data); +} + +function loadChannelStore() { + switch (Config.get('channel-storage.type')) { + case 'database': + return new DatabaseStore(); + case 'file': + default: + return new FileStore(); + } +} diff --git a/src/channel-storage/dbstore.js b/src/channel-storage/dbstore.js new file mode 100644 index 00000000..c53d7a62 --- /dev/null +++ b/src/channel-storage/dbstore.js @@ -0,0 +1,91 @@ +import Promise from 'bluebird'; +import { ChannelStateSizeError, + ChannelNotFoundError } from '../errors'; +import db from '../database'; +import Logger from '../logger'; + +const SIZE_LIMIT = 1048576; +const QUERY_CHANNEL_ID_FOR_NAME = 'SELECT id FROM channels WHERE name = ?'; +const QUERY_CHANNEL_DATA = 'SELECT `key`, `value` FROM channel_data WHERE channel_id = ?'; + +function queryAsync(query, substitutions) { + return new Promise((resolve, reject) => { + db.query(query, substitutions, (err, res) => { + if (err) { + if (!(err instanceof Error)) { + err = new Error(err); + } + reject(err); + } else { + resolve(res); + } + }); + }); +} + +function buildUpdateQuery(numEntries) { + const values = []; + for (let i = 0; i < numEntries; i++) { + values.push('(?, ?, ?)'); + } + + return `INSERT INTO channel_data VALUES ${values.join(', ')} ` + + 'ON DUPLICATE KEY UPDATE `value` = VALUES(`value`)'; +} + +export class DatabaseStore { + load(channelName) { + return queryAsync(QUERY_CHANNEL_ID_FOR_NAME, [channelName]).then((rows) => { + if (rows.length === 0) { + throw new ChannelNotFoundError(`Channel does not exist: "${channelName}"`); + } + + return queryAsync(QUERY_CHANNEL_DATA, [rows[0].id]); + }).then(rows => { + const data = {}; + rows.forEach(row => { + try { + data[row.key] = JSON.parse(row.value); + } catch (e) { + Logger.errlog.log(`Channel data for channel "${channelName}", ` + + `key "${row.key}" is invalid: ${e}`); + } + }); + + return data; + }); + } + + save(channelName, data) { + return queryAsync(QUERY_CHANNEL_ID_FOR_NAME, [channelName]).then((rows) => { + if (rows.length === 0) { + throw new ChannelNotFoundError(`Channel does not exist: "${channelName}"`); + } + + let totalSize = 0; + let rowCount = 0; + const id = rows[0].id; + const substitutions = []; + for (const key in data) { + if (typeof data[key] === 'undefined') { + continue; + } + rowCount++; + const value = JSON.stringify(data[key]); + totalSize += value.length; + substitutions.push(id); + substitutions.push(key); + substitutions.push(value); + } + + if (totalSize > SIZE_LIMIT) { + throw new ChannelStateSizeError('Channel state size is too large', { + limit: SIZE_LIMIT, + actual: totalSize + }); + } + + return queryAsync(buildUpdateQuery(rowCount), substitutions); + }); + } +} diff --git a/src/channel-storage/filestore.js b/src/channel-storage/filestore.js new file mode 100644 index 00000000..dc0f292c --- /dev/null +++ b/src/channel-storage/filestore.js @@ -0,0 +1,56 @@ +import * as Promise from 'bluebird'; +import { stat } from 'fs'; +import * as fs from 'graceful-fs'; +import path from 'path'; +import { ChannelStateSizeError } from '../errors'; + +const readFileAsync = Promise.promisify(fs.readFile); +const writeFileAsync = Promise.promisify(fs.writeFile); +const readdirAsync = Promise.promisify(fs.readdir); +const statAsync = Promise.promisify(stat); +const SIZE_LIMIT = 1048576; +const CHANDUMP_DIR = path.resolve(__dirname, '..', '..', 'chandump'); + +export class FileStore { + filenameForChannel(channelName) { + return path.join(CHANDUMP_DIR, channelName); + } + + load(channelName) { + const filename = this.filenameForChannel(channelName); + return statAsync(filename).then(stats => { + if (stats.size > SIZE_LIMIT) { + throw new ChannelStateSizeError('Channel state file is too large', { + limit: SIZE_LIMIT, + actual: stats.size + }); + } else { + return readFileAsync(filename); + } + }).then(fileContents => { + try { + return JSON.parse(fileContents); + } catch (e) { + throw new Error('Channel state file is not valid JSON: ' + e); + } + }); + } + + save(channelName, data) { + const filename = this.filenameForChannel(channelName); + const fileContents = new Buffer(JSON.stringify(data), 'utf8'); + if (fileContents.length > SIZE_LIMIT) { + return Promise.reject(new ChannelStateSizeError( + 'Channel state size is too large', { + limit: SIZE_LIMIT, + actual: fileContents.length + })); + } + + return writeFileAsync(filename, fileContents); + } + + listChannels() { + return readdirAsync(CHANDUMP_DIR); + } +} diff --git a/src/channel-storage/migrator.js b/src/channel-storage/migrator.js new file mode 100644 index 00000000..25ae82ef --- /dev/null +++ b/src/channel-storage/migrator.js @@ -0,0 +1,164 @@ +import Config from '../config'; +import Promise from 'bluebird'; +import db from '../database'; +import { FileStore } from './filestore'; +import { DatabaseStore } from './dbstore'; +import { sanitizeHTML } from '../xss'; +import { ChannelNotFoundError } from '../errors'; + +const QUERY_CHANNEL_NAMES = 'SELECT name FROM channels WHERE 1'; +const EXPECTED_KEYS = [ + 'chatbuffer', + 'chatmuted', + 'css', + 'emotes', + 'filters', + 'js', + 'motd', + 'openPlaylist', + 'opts', + 'permissions', + 'playlist', + 'poll' +]; + +function queryAsync(query, substitutions) { + return new Promise((resolve, reject) => { + db.query(query, substitutions, (err, res) => { + if (err) { + if (!(err instanceof Error)) { + err = new Error(err); + } + reject(err); + } else { + resolve(res); + } + }); + }); +} + +function fixOldChandump(data) { + const converted = {}; + EXPECTED_KEYS.forEach(key => { + converted[key] = data[key]; + }); + + if (data.queue) { + converted.playlist = { + pl: data.queue.map(item => { + return { + media: { + id: item.id, + title: item.title, + seconds: item.seconds, + duration: item.duration, + type: item.type, + meta: {} + }, + queueby: item.queueby, + temp: item.temp + }; + }), + pos: data.position, + time: data.currentTime + }; + } + + if (data.hasOwnProperty('openqueue')) { + converted.openPlaylist = data.openqueue; + } + + if (data.hasOwnProperty('playlistLock')) { + converted.openPlaylist = !data.playlistLock; + } + + if (data.chatbuffer) { + converted.chatbuffer = data.chatbuffer.map(entry => { + return { + username: entry.username, + msg: entry.msg, + meta: entry.meta || { + addClass: entry.msgclass ? entry.msgclass : undefined + }, + time: entry.time + }; + }); + } + + if (data.motd && data.motd.motd) { + converted.motd = sanitizeHTML(data.motd.motd).replace(/\n/g, '
\n'); + } + + if (data.opts && data.opts.customcss) { + converted.opts.externalcss = data.opts.customcss; + } + + if (data.opts && data.opts.customjs) { + converted.opts.externaljs = data.opts.customjs; + } + + if (data.filters && data.filters.length > 0 && Array.isArray(data.filters[0])) { + converted.filters = data.filters.map(filter => { + let [source, replace, active] = filter; + return { + source: source, + replace: replace, + flags: 'g', + active: active, + filterlinks: false + }; + }); + } + + return converted; +} + +function migrate(src, dest) { + return src.listChannels().then(names => { + return Promise.reduce(names, (_, name) => { + // A long time ago there was a bug where CyTube would save a different + // chandump depending on the capitalization of the channel name in the URL. + // This was fixed, but there are still some really old chandumps with + // uppercase letters in the name. + // + // If another chandump exists which is all lowercase, then that one is + // canonical. Otherwise, it's safe to load the existing capitalization, + // convert it, and save. + if (name !== name.toLowerCase()) { + if (names.indexOf(name.toLowerCase()) >= 0) { + return Promise.resolve(); + } + } + + return src.load(name).then(data => { + data = fixOldChandump(data); + return dest.save(name, data); + }).then(() => { + console.log(`Migrated /r/${name}`); + }).catch(ChannelNotFoundError, err => { + console.log(`Skipping /r/${name} (not present in the database)`); + }).catch(err => { + console.error(`Failed to migrate /r/${name}: ${err.stack}`); + }); + }); + }); +} + +function main() { + Config.load('config.yaml'); + db.init(); + const src = new FileStore(); + const dest = new DatabaseStore(); + + Promise.delay(1000).then(() => { + return migrate(src, dest); + }).then(() => { + console.log('Migration complete'); + process.exit(0); + }).catch(err => { + console.error(`Migration failed: ${err.stack}`); + process.exit(1); + }); +} + +main(); diff --git a/src/channel/channel.js b/src/channel/channel.js index 8f268e80..e2e2940e 100644 --- a/src/channel/channel.js +++ b/src/channel/channel.js @@ -8,8 +8,9 @@ var fs = require("graceful-fs"); var path = require("path"); var sio = require("socket.io"); var db = require("../database"); - -const SIZE_LIMIT = 1048576; +import * as ChannelStore from '../channel-storage/channelstore'; +import { ChannelStateSizeError } from '../errors'; +import Promise from 'bluebird'; /** * Previously, async channel functions were riddled with race conditions due to @@ -150,17 +151,15 @@ Channel.prototype.getDiskSize = function (cb) { }; Channel.prototype.loadState = function () { - var self = this; - var file = path.join(__dirname, "..", "..", "chandump", self.uniqueName); - /* Don't load from disk if not registered */ - if (!self.is(Flags.C_REGISTERED)) { - self.modules.permissions.loadUnregistered(); - self.setFlag(Flags.C_READY); + if (!this.is(Flags.C_REGISTERED)) { + this.modules.permissions.loadUnregistered(); + this.setFlag(Flags.C_READY); return; } - var errorLoad = function (msg) { + const self = this; + function errorLoad(msg) { if (self.modules.customization) { self.modules.customization.load({ motd: msg @@ -168,100 +167,69 @@ Channel.prototype.loadState = function () { } self.setFlag(Flags.C_READY | Flags.C_ERROR); - }; + } - fs.stat(file, function (err, stats) { - if (!err) { - var mb = stats.size / 1048576; - mb = Math.floor(mb * 100) / 100; - if (mb > SIZE_LIMIT / 1048576) { - Logger.errlog.log("Large chandump detected: " + self.uniqueName + - " (" + mb + " MiB)"); - var msg = "This channel's state size has exceeded the memory limit " + - "enforced by this server. Please contact an administrator " + - "for assistance."; - errorLoad(msg); - return; - } - } - continueLoad(); - }); - - var continueLoad = function () { - fs.readFile(file, function (err, data) { - if (err) { - /* ENOENT means the file didn't exist. This is normal for new channels */ - if (err.code === "ENOENT") { - self.setFlag(Flags.C_READY); - Object.keys(self.modules).forEach(function (m) { - self.modules[m].load({}); - }); - } else { - Logger.errlog.log("Failed to open channel dump " + self.uniqueName); - Logger.errlog.log(err); - errorLoad("Unknown error occurred when loading channel state. " + - "Contact an administrator for assistance."); - } - return; - } - - self.logger.log("[init] Loading channel state from disk"); + ChannelStore.load(this.uniqueName).then(data => { + Object.keys(this.modules).forEach(m => { try { - data = JSON.parse(data); - Object.keys(self.modules).forEach(function (m) { - self.modules[m].load(data); - }); - self.setFlag(Flags.C_READY); + this.modules[m].load(data); } catch (e) { - Logger.errlog.log("Channel dump for " + self.uniqueName + " is not " + - "valid"); - Logger.errlog.log(e); - errorLoad("Unknown error occurred when loading channel state. Contact " + - "an administrator for assistance."); + Logger.errlog.log("Failed to load module " + m + " for channel " + + this.uniqueName); } }); - }; + this.setFlag(Flags.C_READY); + }).catch(ChannelStateSizeError, err => { + const message = "This channel's state size has exceeded the memory limit " + + "enforced by this server. Please contact an administrator " + + "for assistance."; + + Logger.errlog.log(err.stack); + errorLoad(message); + }).catch(err => { + if (err.code === 'ENOENT') { + Object.keys(this.modules).forEach(m => { + this.modules[m].load({}); + }); + this.setFlag(Flags.C_READY); + return; + } else { + const message = "An error occurred when loading this channel's data from " + + "disk. Please contact an administrator for assistance. " + + `The error was: ${err}`; + + Logger.errlog.log(err.stack); + errorLoad(message); + } + }); }; Channel.prototype.saveState = function () { - var self = this; - var file = path.join(__dirname, "..", "..", "chandump", self.uniqueName); - - /** - * Don't overwrite saved state data if the current state is dirty, - * or if this channel is unregistered - */ - if (self.is(Flags.C_ERROR) || !self.is(Flags.C_REGISTERED)) { - return; + if (!this.is(Flags.C_REGISTERED)) { + return Promise.resolve(); } - self.logger.log("[init] Saving channel state to disk"); - var data = {}; - Object.keys(this.modules).forEach(function (m) { - self.modules[m].save(data); + if (this.is(Flags.C_ERROR)) { + return Promise.reject(new Error(`Channel is in error state`)); + } + + this.logger.log("[init] Saving channel state to disk"); + const data = {}; + Object.keys(this.modules).forEach(m => { + this.modules[m].save(data); }); - var json = JSON.stringify(data); - /** - * Synchronous on purpose. - * When the server is shutting down, saveState() is called on all channels and - * then the process terminates. Async writeFile causes a race condition that wipes - * channels. - */ - var err = fs.writeFileSync(file, json); + return ChannelStore.save(this.uniqueName, data).catch(ChannelStateSizeError, err => { + this.users.forEach(u => { + if (u.account.effectiveRank >= 2) { + u.socket.emit("warnLargeChandump", { + limit: err.limit, + actual: err.actual + }); + } + }); - // Check for large chandump and warn moderators/admins - self.getDiskSize(function (err, size) { - if (!err && size > SIZE_LIMIT && self.users) { - self.users.forEach(function (u) { - if (u.account.effectiveRank >= 2) { - u.socket.emit("warnLargeChandump", { - limit: SIZE_LIMIT, - actual: size - }); - } - }); - } + throw err; }); }; diff --git a/src/config.js b/src/config.js index 9f53b0f6..5fa5a93b 100644 --- a/src/config.js +++ b/src/config.js @@ -110,6 +110,9 @@ var defaults = { "user": "nobody", "timeout": 15 }, + "channel-storage": { + type: "file" + } }; /** diff --git a/src/database.js b/src/database.js index c8b84852..d6c23bef 100644 --- a/src/database.js +++ b/src/database.js @@ -556,6 +556,11 @@ module.exports.listStats = function (callback) { /* Misc */ module.exports.loadAnnouncement = function () { + // Temporary workaround + if (!Server.getServer || !Server.getServer()) { + return; + } + var query = "SELECT * FROM `meta` WHERE `key`='announcement'"; module.exports.query(query, function (err, rows) { if (err) { diff --git a/src/database/tables.js b/src/database/tables.js index ab72cfd2..1591135a 100644 --- a/src/database/tables.js +++ b/src/database/tables.js @@ -104,6 +104,15 @@ const TBL_BANS = "" + "INDEX (`ip`, `channel`), INDEX (`name`, `channel`)" + ") CHARACTER SET utf8"; +const TBL_CHANNEL_DATA = "" + + "CREATE TABLE IF NOT EXISTS `channel_data` (" + + "`channel_id` INT NOT NULL," + + "`key` VARCHAR(20) NOT NULL," + + "`value` MEDIUMTEXT CHARACTER SET utf8mb4 NOT NULL," + + "PRIMARY KEY (`channel_id`, `key`)," + + "FOREIGN KEY (`channel_id`) REFERENCES `channels`(`id`) ON DELETE CASCADE" + + ") CHARACTER SET utf8"; + module.exports.init = function (queryfn, cb) { var tables = { users: TBL_USERS, @@ -116,7 +125,8 @@ module.exports.init = function (queryfn, cb) { user_playlists: TBL_USER_PLAYLISTS, aliases: TBL_ALIASES, stats: TBL_STATS, - meta: TBL_META + meta: TBL_META, + channel_data: TBL_CHANNEL_DATA }; var AsyncQueue = require("../asyncqueue"); diff --git a/src/errors.js b/src/errors.js new file mode 100644 index 00000000..d8ea077b --- /dev/null +++ b/src/errors.js @@ -0,0 +1,4 @@ +import createError from 'create-error'; + +export const ChannelStateSizeError = createError('ChannelStateSizeError'); +export const ChannelNotFoundError = createError('ChannelNotFoundError'); diff --git a/src/server.js b/src/server.js index 6b956b74..31e5bfbe 100644 --- a/src/server.js +++ b/src/server.js @@ -1,6 +1,8 @@ const VERSION = require("../package.json").version; var singleton = null; var Config = require("./config"); +var Promise = require("bluebird"); +import * as ChannelStore from './channel-storage/channelstore'; module.exports = { init: function () { @@ -55,6 +57,7 @@ var Server = function () { var Database = require("./database"); self.db = Database; self.db.init(); + ChannelStore.init(); // webserver init ----------------------------------------------------- self.express = express(); @@ -226,13 +229,18 @@ Server.prototype.announce = function (data) { Server.prototype.shutdown = function () { Logger.syslog.log("Unloading channels"); - for (var i = 0; i < this.channels.length; i++) { - if (this.channels[i].is(Flags.C_REGISTERED)) { - Logger.syslog.log("Saving /r/" + this.channels[i].name); - this.channels[i].saveState(); - } - } - Logger.syslog.log("Goodbye"); - process.exit(0); + Promise.map(this.channels, channel => { + return channel.saveState().tap(() => { + Logger.syslog.log(`Saved /r/${channel.name}`); + }).catch(err => { + Logger.errlog.log(`Failed to save /r/${channel.name}: ${err.stack}`); + }); + }).then(() => { + Logger.syslog.log("Goodbye"); + process.exit(0); + }).catch(err => { + Logger.errlog.log(`Caught error while saving channels: ${err.stack}`); + process.exit(1); + }); }; diff --git a/www/js/callbacks.js b/www/js/callbacks.js index bac504ab..84efc394 100644 --- a/www/js/callbacks.js +++ b/www/js/callbacks.js @@ -1072,8 +1072,9 @@ Callbacks = { errDialog("This channel currently exceeds the maximum size of " + toHumanReadable(data.limit) + " (channel size is " + toHumanReadable(data.actual) + "). Please reduce the size by removing " + - "unneeded playlist items, filters, and/or emotes or else the channel will " + - "be unable to load the next time it is reloaded").attr("id", "chandumptoobig"); + "unneeded playlist items, filters, and/or emotes. Changes to the channel " + + "will not be saved until the size is reduced to under the limit.") + .attr("id", "chandumptoobig"); } }