Implemented back-end for basic time-based queueing system.

This commit is contained in:
rainbow napkin 2025-01-19 15:13:31 -05:00
parent f38eae170d
commit 4f6b3318a0
11 changed files with 329 additions and 140 deletions

View file

@ -54,6 +54,9 @@ module.exports = class{
//if everything looks good, admit the connection to the channel //if everything looks good, admit the connection to the channel
socket.join(socket.chan); socket.join(socket.chan);
//Define per-channel event listeners
this.queue.defineListeners(socket);
//Hand off the connection initiation to it's user object //Hand off the connection initiation to it's user object
await userObj.handleConnection(userDB, chanDB, socket) await userObj.handleConnection(userDB, chanDB, socket)

View file

@ -23,7 +23,7 @@ const loggerUtils = require('../../utils/loggerUtils');
const csrfUtils = require('../../utils/csrfUtils'); const csrfUtils = require('../../utils/csrfUtils');
const activeChannel = require('./activeChannel'); const activeChannel = require('./activeChannel');
const chatHandler = require('./chatHandler'); const chatHandler = require('./chatHandler');
const mediaYanker = require('./media/yanker'); //const mediaYanker = require('./media/yanker');
module.exports = class{ module.exports = class{
constructor(io){ constructor(io){
@ -35,7 +35,7 @@ module.exports = class{
//Load server components //Load server components
this.chatHandler = new chatHandler(this); this.chatHandler = new chatHandler(this);
this.mediaYanker = new mediaYanker(this); //this.mediaYanker = new mediaYanker(this);
//Handle connections from socket.io //Handle connections from socket.io
io.on("connection", this.handleConnection.bind(this) ); io.on("connection", this.handleConnection.bind(this) );
@ -73,7 +73,6 @@ module.exports = class{
//Define listeners for inter-channel classes //Define listeners for inter-channel classes
this.defineListeners(socket); this.defineListeners(socket);
this.chatHandler.defineListeners(socket); this.chatHandler.defineListeners(socket);
this.mediaYanker.defineListeners(socket);
//Hand off the connection to it's given active channel object //Hand off the connection to it's given active channel object
//Lil' hacky to pass chanDB like that, but why double up on DB calls? //Lil' hacky to pass chanDB like that, but why double up on DB calls?

View file

@ -46,7 +46,7 @@ module.exports = class{
this.sendUsedTokes(userDB); this.sendUsedTokes(userDB);
//Send out the currently playing item //Send out the currently playing item
this.channel.queue.sendQueue(socket); this.channel.queue.sendMedia(socket);
//Tattoo hashed IP address to user account for seven days //Tattoo hashed IP address to user account for seven days
await userDB.tattooIPRecord(socket.handshake.address); await userDB.tattooIPRecord(socket.handshake.address);

View file

@ -16,6 +16,8 @@ along with this program. If not, see <https://www.gnu.org/licenses/>.*/
//Local imports //Local imports
const queuedMedia = require('./queuedMedia'); const queuedMedia = require('./queuedMedia');
const yanker = require('../../../utils/media/yanker');
const loggerUtils = require('../../../utils/loggerUtils');
module.exports = class{ module.exports = class{
constructor(server, channel){ constructor(server, channel){
@ -23,23 +25,137 @@ module.exports = class{
this.server = server this.server = server
//Set channel //Set channel
this.channel = channel; this.channel = channel;
//Create map to hold currently queued media
this.schedule = new Map();
//Create variable to hold sync delta in ms //Create variable to hold sync delta in ms
this.syncDelta = 1000; this.syncDelta = 1000;
//Create variable to hold current timestamp within the video //Create variable to hold current timestamp within the video
this.timestamp = 0; this.timestamp = 0;
//Create variable to hold sync timer //Create variable to hold sync timer
this.syncTimer = null; this.syncTimer = null;
//Create variable to hold next playing item timer
this.nextTimer = null;
//Create variable to hold currently playing media object //Create variable to hold currently playing media object
this.nowPlaying = null; this.nowPlaying = null;
} }
queueMedia(inputMedia){ defineListeners(socket){
//Create new media object, set start time to now socket.on("queue", (data) => {this.queueURL(socket, data)});
const mediaObj = queuedMedia.fromMedia(inputMedia, new Date().getTime()); }
//Start playback
this.start(mediaObj); async queueURL(socket, data){
try{
//pull URL and start time from data
let {url, start} = data;
//Pull media list
const mediaList = await yanker.yankMedia(url);
//Queue the first media object given
this.queueMedia(mediaList[0], start, socket);
}catch(err){
return loggerUtils.socketExceptionHandler(socket, err);
}
}
//Default start time to now + half a second to give everyone time to process shit
queueMedia(inputMedia, start = new Date().getTime() + 50, socket){
//Create a new media queued object, set start time to now
const mediaObj = queuedMedia.fromMedia(inputMedia, start);
//schedule the media
this.scheduleMedia(mediaObj, socket);
//Refresh the next timer to ensure whatever comes on next is right
this.refreshNextTimer();
}
refreshNextTimer(){
//Grab the next item
const nextItem = this.getNextItem();
//If we have no next item
if(nextItem == null){
//Fuck off and die
return;
}
//Calculate the amount of time in ms that the next item will start in
const startsIn = nextItem.startTime - new Date().getTime();
//Clear out any item that might be up next
clearTimeout(this.nextTimer);
//Set the next timer
this.nextTimer = setTimeout(()=>{this.start(nextItem)}, startsIn);
}
scheduleMedia(mediaObj, socket){
/* This is a fun method and I think it deserves it's own little explination...
Since we're working with a time based schedule, using start epochs as keys for our iterable seemed the best option
I don't want to store everything in a sparse array because that *feels* icky, and would probably be a pain in the ass.
Maps seem like a good choice, if it wheren't for the issue of keeping them ordered...
That's where this comes in. You see if we temporarily store it in a sparse array and convert into a map,
we can quickly and easily create a properly sorted schedule map that, out side of adding items, behaves normally.
Also a note on preformance:
While .forEach ONLY runs through populated items in sparse arrays, many JS implementations run through them in the background,
simply skipping them before executing the provided function. Looping through object.keys(arr), however, avoids this entirely,
since it ONLY loops through defiened items within the array. No skipped empties for your runtime to worry about.
Even more preformance benefits can be had by using a real for loop on the arrays keys, skipping the overhead of forEach entirely.
This might seem gross but it completely avoids the computational workload of a sorting algo, especially when you consider
that, no matter what, re-ordering the schedule map would've required us to iterate through and convert it to an array and back anyways...
Also it looks like due to implementation limitations, epochs stored as MS are too large for array elements, so we store them as seconds.
This also means that our current implementation will break exactly on unix epoch 4294967295 (Feb 7, 2106 6:28:15 AM UTC)
Hopefully javascript arrays will allow for larger lengths by then. If not blame the W3C :P
If for some reason they haven't we could probably implement an object that wraps a 2d array and set/gets it using modulo/devision/multiplication
Further Reading:
https://stackoverflow.com/questions/59480871/foreach-vs-object-keys-foreach-performance-on-sparse-arrays
https://community.appsmith.com/content/blog/dark-side-foreach-why-you-should-think-twice-using-it
*/
//If there's already something queued right now
if(this.getItemAtEpoch(mediaObj.startTime) != null){
//If an originating socket was provided for this request
if(socket != null){
//Yell at the user for being an asshole
loggerUtils.socketErrorHandler(socket, "This time slot has already been taken in the queue!", "queue");
}
//Ignore it
return;
}
//Create an empty temp array to sparsley populate with our schedule
const tempSchedule = [];
//Create new map to replace our current schedule map
const newSchedule = new Map();
//For every item that's already been scheduled
for(let item of this.schedule){
//add it to the slot corresponding to it's start epoch in seconds
tempSchedule[Math.round(item[0] / 1000)] = item[1];
}
//Inject the media object into the slot corresponding to it's epoch in the temp schedule array
tempSchedule[Math.round(mediaObj.startTime / 1000)] = mediaObj;
//For every populated key in our array
for(let startTime of Object.keys(tempSchedule)){
//Add item to replacement schedule map
newSchedule.set(tempSchedule[startTime].startTime, tempSchedule[startTime]);
}
//Replace the existing schedule map with our new one
this.schedule = newSchedule;
} }
start(mediaObj){ start(mediaObj){
@ -53,10 +169,13 @@ module.exports = class{
this.nowPlaying = mediaObj; this.nowPlaying = mediaObj;
//Send play signal out to the channel //Send play signal out to the channel
this.sendQueue(); this.sendMedia();
//Kick off the sync timer //Kick off the sync timer
this.syncTimer = setTimeout(this.sync.bind(this), this.syncDelta); this.syncTimer = setTimeout(this.sync.bind(this), this.syncDelta);
//Setup the next video
this.refreshNextTimer();
} }
sync(){ sync(){
@ -99,7 +218,56 @@ module.exports = class{
} }
} }
sendQueue(socket){ getItemAtEpoch(epoch = new Date().getTime()){
//Loop through scheduled items
for(let item of this.schedule){
//If we're past or at the start time and at or before the end time
if(item[0] <= epoch && item[1].getEndTime() >= epoch){
//return the current item
return item[1]
}
}
//If we fell through the loop return null
return null;
}
getLastItem(epoch = new Date().getTime()){
//Create variable to hold the last item
let last;
//Loop through scheduled items
for(let item of this.schedule){
//If we've stumbled on to the next item
if(item[0] >= epoch){
//Break the loop
break;
//If we've stumbled upon an item that is currently playing
}else if(item[1].getEndTime() >= epoch){
//Break the loop
break;
//If we made it through this iteration without breaking the loop
}
//Set current item to last item
last = item[1];
}
//If the loop has been broken or fallen through, return last.
return last;
}
getNextItem(epoch = new Date().getTime()){
//Iterate through the schedule
for(let item of this.schedule){
if(item[0] >= epoch){
//Pull the scheduled media object from the map entry array
return item[1];
}
}
}
sendMedia(socket){
//Create data object //Create data object
const data = { const data = {
media: this.nowPlaying, media: this.nowPlaying,

View file

@ -40,4 +40,7 @@ module.exports = class extends media{
this.uuid = crypto.randomUUID(); this.uuid = crypto.randomUUID();
} }
getEndTime(){
return this.startTime + (this.duration * 1000);
}
} }

View file

@ -1,101 +0,0 @@
/*Canopy - The next generation of stoner streaming software
Copyright (C) 2024-2025 Rainbownapkin and the TTN Community
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU Affero General Public License as
published by the Free Software Foundation, either version 3 of the
License, or (at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU Affero General Public License for more details.
You should have received a copy of the GNU Affero General Public License
along with this program. If not, see <https://www.gnu.org/licenses/>.*/
//NPM Imports
const validator = require('validator');//No express here, so regular validator it is!
//local import
const loggerUtils = require('../../../utils/loggerUtils');
const iaUtil = require('../../../utils/media/internetArchiveUtils');
const media = require('./media');
module.exports = class{
constructor(server){
this.server = server;
}
defineListeners(socket){
socket.on("yank", (data) => {this.testYank(socket, data)});
socket.on("play", (data) => {this.testPlay(socket, data)});
}
async testYank(socket, data){
try{
console.log(await this.yankMedia(data.url));
}catch(err){
return loggerUtils.socketExceptionHandler(socket, err);
}
}
async testPlay(socket, data){
try{
//Pull media list
const mediaList = await this.yankMedia(data.url);
//Get active channel from server/socket
const chan = this.server.activeChannels.get(socket.chan);
//Queue the first media object given
chan.queue.queueMedia(mediaList[0]);
}catch(err){
return loggerUtils.socketExceptionHandler(socket, err);
}
}
async yankMedia(url){
const pullType = await this.getMediaType(url);
if(pullType == 'ia'){
//Create empty list to hold media objects
const mediaList = [];
//Pull metadata from IA
const mediaInfo = await iaUtil.fetchMetadata(url);
//for every compatible and relevant file returned from IA
for(let file of mediaInfo.files){
//Split file path by directories
const path = file.name.split('/');
//pull filename from path
const name = path[path.length - 1];
//Construct link from pulled info
const link = `https://archive.org/download/${mediaInfo.metadata.identifier}/${file.name}`;
//Create new media object from file info
mediaList.push(new media(name, name, link, 'ia', Number(file.length)));
}
//return media object list
return mediaList;
}else{
//return null to signify a bad url
return null;
}
}
async getMediaType(url){
//Check if we have a valid url
if(!validator.isURL(url)){
//If not toss the fucker out
return null;
}
//If we have link to a resource from archive.org
if(url.match(/^https\:\/\/archive.org\//g)){
//return internet archive code
return "ia";
}
return null;
}
}

View file

@ -37,6 +37,10 @@ module.exports.exceptionHandler = function(res, err){
module.exports.errorHandler(res, err.message, "Caught Exception"); module.exports.errorHandler(res, err.message, "Caught Exception");
} }
module.exports.socketErrorHandler = function(socket, msg, type = "Generic"){
return socket.emit("error", {errors: [{type, msg, date: new Date()}]});
}
module.exports.socketExceptionHandler = function(socket, err){ module.exports.socketExceptionHandler = function(socket, err){
//If we're being verbose //If we're being verbose
if(config.verbose){ if(config.verbose){
@ -45,7 +49,7 @@ module.exports.socketExceptionHandler = function(socket, err){
} }
//if not yell at the browser for fucking up, and tell it what it did wrong. //if not yell at the browser for fucking up, and tell it what it did wrong.
return socket.emit("error", {errors: [{type: "Caught Exception", msg: err.message, date: new Date()}]}); return module.exports.socketErrorHandler(socket, err.msg, "Caught Exception");
} }
module.exports.socketCriticalExceptionHandler = function(socket, err){ module.exports.socketCriticalExceptionHandler = function(socket, err){

68
src/utils/media/yanker.js Normal file
View file

@ -0,0 +1,68 @@
/*Canopy - The next generation of stoner streaming software
Copyright (C) 2024-2025 Rainbownapkin and the TTN Community
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU Affero General Public License as
published by the Free Software Foundation, either version 3 of the
License, or (at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU Affero General Public License for more details.
You should have received a copy of the GNU Affero General Public License
along with this program. If not, see <https://www.gnu.org/licenses/>.*/
//NPM Imports
const validator = require('validator');//No express here, so regular validator it is!
//local import
const iaUtil = require('./internetArchiveUtils');
const media = require('../../app/channel/media/media');
module.exports.yankMedia = async function(url){
const pullType = await this.getMediaType(url);
if(pullType == 'ia'){
//Create empty list to hold media objects
const mediaList = [];
//Pull metadata from IA
const mediaInfo = await iaUtil.fetchMetadata(url);
//for every compatible and relevant file returned from IA
for(let file of mediaInfo.files){
//Split file path by directories
const path = file.name.split('/');
//pull filename from path
const name = path[path.length - 1];
//Construct link from pulled info
const link = `https://archive.org/download/${mediaInfo.metadata.identifier}/${file.name}`;
//Create new media object from file info
mediaList.push(new media(name, name, link, 'ia', Number(file.length)));
}
//return media object list
return mediaList;
}else{
//return null to signify a bad url
return null;
}
}
module.exports.getMediaType = async function(url){
//Check if we have a valid url
if(!validator.isURL(url)){
//If not toss the fucker out
return null;
}
//If we have link to a resource from archive.org
if(url.match(/^https\:\/\/archive.org\//g)){
//return internet archive code
return "ia";
}
return null;
}

View file

@ -22,8 +22,8 @@ along with this program. If not, see <https://www.gnu.org/licenses/>. %>
<i title="Synchronize" class="media-panel panel-head-element bi-arrow-repeat" id="media-panel-sync-icon"></i> <i title="Synchronize" class="media-panel panel-head-element bi-arrow-repeat" id="media-panel-sync-icon"></i>
<i title="Lock Chat Size to Video Aspect Ratio" class="media-panel panel-head-element bi-aspect-ratio-fill" id="media-panel-aspect-lock-icon"></i> <i title="Lock Chat Size to Video Aspect Ratio" class="media-panel panel-head-element bi-aspect-ratio-fill" id="media-panel-aspect-lock-icon"></i>
<i title="Cinema Mode" class="media-panel panel-head-element bi-film" id="media-panel-cinema-mode-icon"></i> <i title="Cinema Mode" class="media-panel panel-head-element bi-film" id="media-panel-cinema-mode-icon"></i>
<i title="Horizontal Flip" class="media-panel panel-head-element bi-arrows-vertical" id="media-panel-flip-vertical-icon"></i> <i title="Vertical Flip" class="media-panel panel-head-element bi-arrows-vertical" id="media-panel-flip-vertical-icon"></i>
<i title="Vertical Flip" class="media-panel panel-head-element bi-arrows" id="media-panel-flip-horizontal-icon"></i> <i title="Horizontal Flip" class="media-panel panel-head-element bi-arrows" id="media-panel-flip-horizontal-icon"></i>
<i title="Reload Media" class="media-panel panel-head-element bi-arrow-clockwise" id="media-panel-reload-icon"></i> <i title="Reload Media" class="media-panel panel-head-element bi-arrow-clockwise" id="media-panel-reload-icon"></i>
<i title="Show Chat" class="media-panel panel-head-element bi-chat-right-dots-fill" id="media-panel-show-chat-icon"></i> <i title="Show Chat" class="media-panel panel-head-element bi-chat-right-dots-fill" id="media-panel-show-chat-icon"></i>
</div> </div>

View file

@ -15,11 +15,15 @@ You should have received a copy of the GNU Affero General Public License
along with this program. If not, see <https://www.gnu.org/licenses/>.*/ along with this program. If not, see <https://www.gnu.org/licenses/>.*/
class mediaHandler{ class mediaHandler{
constructor(client, player, media){ constructor(client, player, media, type){
//Get parents //Get parents
this.client = client; this.client = client;
this.player = player; this.player = player;
//Set handler type
this.type = type
//Set last received timestamp to 0
this.lastTimestamp = 0; this.lastTimestamp = 0;
//Ingest media object from server //Ingest media object from server
@ -122,7 +126,20 @@ class mediaHandler{
class nullHandler extends mediaHandler{ class nullHandler extends mediaHandler{
constructor(client, player){ constructor(client, player){
//Call derived constructor //Call derived constructor
super(client, player, {}); super(client, player, {}, null);
this.defineListeners();
}
defineListeners(){
//Disable right clicking
this.video.addEventListener('contextmenu', (e)=>{e.preventDefault()});
this.video.addEventListener('loadedmetadata', this.onMetadataLoad.bind(this));
}
onMetadataLoad(event){
//Resize aspect (if locked), since the video doesn't properly report it's resolution until it's been loaded
this.client.chatBox.resizeAspect();
} }
start(){ start(){
@ -143,16 +160,18 @@ class nullHandler extends mediaHandler{
class rawFileHandler extends mediaHandler{ class rawFileHandler extends mediaHandler{
constructor(client, player, media){ constructor(client, player, media){
//Call derived constructor //Call derived constructor
super(client, player, media); super(client, player, media, 'raw');
//Since this media type has no way to tell between the user and code seek events, we need a flag to mark them //Since this media type has no way to tell between events that originate from either the user or code
this.selfSeek = false; //That's what this boolean is for :P
this.selfAct = false;
//Define listeners //Define listeners
this.defineListeners(); this.defineListeners();
} }
defineListeners(){ defineListeners(){
this.video.addEventListener('loadedmetadata', this.onMetadataLoad.bind(this));
this.video.addEventListener('pause', this.onPause.bind(this)); this.video.addEventListener('pause', this.onPause.bind(this));
this.video.addEventListener('seeking', this.onSeek.bind(this)); this.video.addEventListener('seeking', this.onSeek.bind(this));
} }
@ -180,37 +199,56 @@ class rawFileHandler extends mediaHandler{
} }
sync(timestamp = this.lastTimestamp){ sync(timestamp = this.lastTimestamp){
//Set self seek flag //Skip sync calls that won't seek so we don't pointlessly throw selfAct
this.selfSeek = true; if(timestamp != this.video.currentTime){
//Set self act flag
this.selfAct = true;
//Set current video time based on timestamp received from server //Set current video time based on timestamp received from server
this.video.currentTime = timestamp; this.video.currentTime = timestamp;
}
} }
reload(){ reload(){
//Throw self seek flag to make sure we don't un-sync the player //Throw self act flag to make sure we don't un-sync the player
this.selfSeek = true; this.selfAct = true;
//Call derived reload function //Call derived reload function
super.reload(); super.reload();
} }
onMetadataLoad(event){
//Resize aspect (if locked), since the video doesn't properly report it's resolution until it's been loaded
this.client.chatBox.resizeAspect();
}
onPause(event){ onPause(event){
this.player.unlockSync(); //If the video was paused out-side of code
if(!this.selfAct){
this.player.unlockSync();
}
this.selfAct = false;
} }
onSeek(event){ onSeek(event){
//If the video was seeked out-side of code //If the video was seeked out-side of code
if(!this.selfSeek){ if(!this.selfAct){
this.player.unlockSync(); this.player.unlockSync();
} }
//reset self seek flag //reset self act flag
this.selfSeek = false; this.selfAct = false;
} }
getTimestamp(){ getTimestamp(){
//Return current timestamp //Return current timestamp
return this.video.currentTime; return this.video.currentTime;
} }
end(){
//Throw self act to prevent unlock on video end
this.selfAct = true;
super.end();
}
} }

View file

@ -41,13 +41,12 @@ class player{
this.reloadIcon = document.querySelector("#media-panel-reload-icon"); this.reloadIcon = document.querySelector("#media-panel-reload-icon");
//Numbers //Numbers
this.syncTolerance = 1; this.syncTolerance = 0.4;
this.syncDelta = 6; this.syncDelta = 6;
//run setup functions //run setup functions
this.setupInput(); this.setupInput();
this.defineListeners(); this.defineListeners();
this.lockSync();
} }
setupInput(){ setupInput(){
@ -97,6 +96,9 @@ class player{
} }
} }
//Lock synchronization since everyone starts at 0, and update the UI
this.lockSync();
//Re-size to aspect since video may now be a different size //Re-size to aspect since video may now be a different size
this.client.chatBox.resizeAspect(); this.client.chatBox.resizeAspect();
} }
@ -133,27 +135,32 @@ class player{
//Replace it with a null handler //Replace it with a null handler
this.mediaHandler = new nullHandler(client, this); this.mediaHandler = new nullHandler(client, this);
//Re-lock sync since we're probably gonna start new media soon anywho, and we need to update the UI anywho
this.lockSync();
} }
lockSync(){ lockSync(){
//Light up the sync icon to show that we're actively synchronized
this.syncIcon.classList.add('positive');
//Enable syncing //Enable syncing
this.syncLock = true; this.syncLock = true;
//If we have a media handler if(this.mediaHandler != null && this.mediaHandler.type != null){
if(this.mediaHandler != null){ //Light up the sync icon to show that we're actively synchronized
this.syncIcon.classList.add('positive');
//Sync to last timestamp //Sync to last timestamp
this.mediaHandler.sync(); this.mediaHandler.sync();
//Play //Play
this.mediaHandler.play(); this.mediaHandler.play();
}else{
//Unlight the sync icon since there is nothing to sync
this.syncIcon.classList.remove('positive');
} }
} }
unlockSync(){ unlockSync(){
//Unlight the sync icon //Unlight the sync icon since we're no longer actively synced
this.syncIcon.classList.remove('positive'); this.syncIcon.classList.remove('positive');
//Disable syncing //Disable syncing