Add experimental feature to reduce database writes for channel data

This commit is contained in:
Calvin Montgomery 2017-12-10 10:36:28 -08:00
parent a9062159ed
commit c4cc22dd05
12 changed files with 190 additions and 61 deletions

View file

@ -1,10 +1,19 @@
import Promise from 'bluebird';
import { ChannelStateSizeError } from '../errors';
import db from '../database';
import { Counter } from 'prom-client';
const LOGGER = require('@calzoneman/jsli')('dbstore');
const SIZE_LIMIT = 1048576;
const QUERY_CHANNEL_DATA = 'SELECT `key`, `value` FROM channel_data WHERE channel_id = ?';
const loadRowcount = new Counter({
name: 'cytube_channel_db_load_rows_total',
help: 'Total rows loaded from the channel_data table'
});
const saveRowcount = new Counter({
name: 'cytube_channel_db_save_rows_total',
help: 'Total rows saved in the channel_data table'
});
function queryAsync(query, substitutions) {
return new Promise((resolve, reject) => {
@ -39,6 +48,8 @@ export class DatabaseStore {
}
return queryAsync(QUERY_CHANNEL_DATA, [id]).then(rows => {
loadRowcount.inc(rows.length);
const data = {};
rows.forEach(row => {
try {
@ -53,35 +64,47 @@ export class DatabaseStore {
});
}
save(id, channelName, data) {
async save(id, channelName, data) {
if (!id || id === 0) {
return Promise.reject(new Error(`Cannot save state for [${channelName}]: ` +
`id was passed as [${id}]`));
throw new Error(
`Cannot save state for [${channelName}]: ` +
`id was passed as [${id}]`
);
}
let totalSize = 0;
let rowCount = 0;
const substitutions = [];
for (const key in data) {
if (typeof data[key] === 'undefined') {
continue;
}
rowCount++;
const value = JSON.stringify(data[key]);
totalSize += value.length;
substitutions.push(id);
substitutions.push(key);
substitutions.push(value);
}
if (rowCount === 0) {
return;
}
saveRowcount.inc(rowCount);
if (totalSize > SIZE_LIMIT) {
return Promise.reject(new ChannelStateSizeError(
throw new ChannelStateSizeError(
'Channel state size is too large', {
limit: SIZE_LIMIT,
actual: totalSize
}));
});
}
return queryAsync(buildUpdateQuery(rowCount), substitutions);
return await queryAsync(buildUpdateQuery(rowCount), substitutions);
}
}