From 4f6b3318a0b79fde87f9f6fedd8209c834354b6c Mon Sep 17 00:00:00 2001 From: rainbow napkin Date: Sun, 19 Jan 2025 15:13:31 -0500 Subject: [PATCH] Implemented back-end for basic time-based queueing system. --- src/app/channel/activeChannel.js | 3 + src/app/channel/channelManager.js | 5 +- src/app/channel/connectedUser.js | 2 +- src/app/channel/media/queue.js | 184 ++++++++++++++++++++++- src/app/channel/media/queuedMedia.js | 3 + src/app/channel/media/yanker.js | 101 ------------- src/utils/loggerUtils.js | 6 +- src/utils/media/yanker.js | 68 +++++++++ src/views/partial/channel/mediaPanel.ejs | 4 +- www/js/channel/mediaHandler.js | 70 +++++++-- www/js/channel/player.js | 23 ++- 11 files changed, 329 insertions(+), 140 deletions(-) delete mode 100644 src/app/channel/media/yanker.js create mode 100644 src/utils/media/yanker.js diff --git a/src/app/channel/activeChannel.js b/src/app/channel/activeChannel.js index 336a52c..7eb1091 100644 --- a/src/app/channel/activeChannel.js +++ b/src/app/channel/activeChannel.js @@ -54,6 +54,9 @@ module.exports = class{ //if everything looks good, admit the connection to the channel socket.join(socket.chan); + //Define per-channel event listeners + this.queue.defineListeners(socket); + //Hand off the connection initiation to it's user object await userObj.handleConnection(userDB, chanDB, socket) diff --git a/src/app/channel/channelManager.js b/src/app/channel/channelManager.js index 799df1a..d0d4eb6 100644 --- a/src/app/channel/channelManager.js +++ b/src/app/channel/channelManager.js @@ -23,7 +23,7 @@ const loggerUtils = require('../../utils/loggerUtils'); const csrfUtils = require('../../utils/csrfUtils'); const activeChannel = require('./activeChannel'); const chatHandler = require('./chatHandler'); -const mediaYanker = require('./media/yanker'); +//const mediaYanker = require('./media/yanker'); module.exports = class{ constructor(io){ @@ -35,7 +35,7 @@ module.exports = class{ //Load server components this.chatHandler = new chatHandler(this); - this.mediaYanker = new mediaYanker(this); + //this.mediaYanker = new mediaYanker(this); //Handle connections from socket.io io.on("connection", this.handleConnection.bind(this) ); @@ -73,7 +73,6 @@ module.exports = class{ //Define listeners for inter-channel classes this.defineListeners(socket); this.chatHandler.defineListeners(socket); - this.mediaYanker.defineListeners(socket); //Hand off the connection to it's given active channel object //Lil' hacky to pass chanDB like that, but why double up on DB calls? diff --git a/src/app/channel/connectedUser.js b/src/app/channel/connectedUser.js index f239aac..431c10a 100644 --- a/src/app/channel/connectedUser.js +++ b/src/app/channel/connectedUser.js @@ -46,7 +46,7 @@ module.exports = class{ this.sendUsedTokes(userDB); //Send out the currently playing item - this.channel.queue.sendQueue(socket); + this.channel.queue.sendMedia(socket); //Tattoo hashed IP address to user account for seven days await userDB.tattooIPRecord(socket.handshake.address); diff --git a/src/app/channel/media/queue.js b/src/app/channel/media/queue.js index de989ca..27ebf38 100644 --- a/src/app/channel/media/queue.js +++ b/src/app/channel/media/queue.js @@ -16,6 +16,8 @@ along with this program. If not, see .*/ //Local imports const queuedMedia = require('./queuedMedia'); +const yanker = require('../../../utils/media/yanker'); +const loggerUtils = require('../../../utils/loggerUtils'); module.exports = class{ constructor(server, channel){ @@ -23,23 +25,137 @@ module.exports = class{ this.server = server //Set channel this.channel = channel; + + //Create map to hold currently queued media + this.schedule = new Map(); + //Create variable to hold sync delta in ms this.syncDelta = 1000; //Create variable to hold current timestamp within the video this.timestamp = 0; + //Create variable to hold sync timer this.syncTimer = null; + //Create variable to hold next playing item timer + this.nextTimer = null; //Create variable to hold currently playing media object this.nowPlaying = null; - } - queueMedia(inputMedia){ - //Create new media object, set start time to now - const mediaObj = queuedMedia.fromMedia(inputMedia, new Date().getTime()); + defineListeners(socket){ + socket.on("queue", (data) => {this.queueURL(socket, data)}); + } - //Start playback - this.start(mediaObj); + + async queueURL(socket, data){ + try{ + //pull URL and start time from data + let {url, start} = data; + + //Pull media list + const mediaList = await yanker.yankMedia(url); + + //Queue the first media object given + this.queueMedia(mediaList[0], start, socket); + }catch(err){ + return loggerUtils.socketExceptionHandler(socket, err); + } + } + + //Default start time to now + half a second to give everyone time to process shit + queueMedia(inputMedia, start = new Date().getTime() + 50, socket){ + //Create a new media queued object, set start time to now + const mediaObj = queuedMedia.fromMedia(inputMedia, start); + + //schedule the media + this.scheduleMedia(mediaObj, socket); + + //Refresh the next timer to ensure whatever comes on next is right + this.refreshNextTimer(); + } + + refreshNextTimer(){ + //Grab the next item + const nextItem = this.getNextItem(); + + //If we have no next item + if(nextItem == null){ + //Fuck off and die + return; + } + + //Calculate the amount of time in ms that the next item will start in + const startsIn = nextItem.startTime - new Date().getTime(); + + //Clear out any item that might be up next + clearTimeout(this.nextTimer); + //Set the next timer + this.nextTimer = setTimeout(()=>{this.start(nextItem)}, startsIn); + } + + scheduleMedia(mediaObj, socket){ + /* This is a fun method and I think it deserves it's own little explination... + Since we're working with a time based schedule, using start epochs as keys for our iterable seemed the best option + I don't want to store everything in a sparse array because that *feels* icky, and would probably be a pain in the ass. + Maps seem like a good choice, if it wheren't for the issue of keeping them ordered... + + That's where this comes in. You see if we temporarily store it in a sparse array and convert into a map, + we can quickly and easily create a properly sorted schedule map that, out side of adding items, behaves normally. + + Also a note on preformance: + While .forEach ONLY runs through populated items in sparse arrays, many JS implementations run through them in the background, + simply skipping them before executing the provided function. Looping through object.keys(arr), however, avoids this entirely, + since it ONLY loops through defiened items within the array. No skipped empties for your runtime to worry about. + Even more preformance benefits can be had by using a real for loop on the arrays keys, skipping the overhead of forEach entirely. + This might seem gross but it completely avoids the computational workload of a sorting algo, especially when you consider + that, no matter what, re-ordering the schedule map would've required us to iterate through and convert it to an array and back anyways... + + + Also it looks like due to implementation limitations, epochs stored as MS are too large for array elements, so we store them as seconds. + This also means that our current implementation will break exactly on unix epoch 4294967295 (Feb 7, 2106 6:28:15 AM UTC) + Hopefully javascript arrays will allow for larger lengths by then. If not blame the W3C :P + + If for some reason they haven't we could probably implement an object that wraps a 2d array and set/gets it using modulo/devision/multiplication + + Further Reading: + https://stackoverflow.com/questions/59480871/foreach-vs-object-keys-foreach-performance-on-sparse-arrays + https://community.appsmith.com/content/blog/dark-side-foreach-why-you-should-think-twice-using-it + */ + + //If there's already something queued right now + if(this.getItemAtEpoch(mediaObj.startTime) != null){ + //If an originating socket was provided for this request + if(socket != null){ + //Yell at the user for being an asshole + loggerUtils.socketErrorHandler(socket, "This time slot has already been taken in the queue!", "queue"); + } + //Ignore it + return; + } + + + //Create an empty temp array to sparsley populate with our schedule + const tempSchedule = []; + //Create new map to replace our current schedule map + const newSchedule = new Map(); + + //For every item that's already been scheduled + for(let item of this.schedule){ + //add it to the slot corresponding to it's start epoch in seconds + tempSchedule[Math.round(item[0] / 1000)] = item[1]; + } + + //Inject the media object into the slot corresponding to it's epoch in the temp schedule array + tempSchedule[Math.round(mediaObj.startTime / 1000)] = mediaObj; + + //For every populated key in our array + for(let startTime of Object.keys(tempSchedule)){ + //Add item to replacement schedule map + newSchedule.set(tempSchedule[startTime].startTime, tempSchedule[startTime]); + } + + //Replace the existing schedule map with our new one + this.schedule = newSchedule; } start(mediaObj){ @@ -53,10 +169,13 @@ module.exports = class{ this.nowPlaying = mediaObj; //Send play signal out to the channel - this.sendQueue(); + this.sendMedia(); //Kick off the sync timer this.syncTimer = setTimeout(this.sync.bind(this), this.syncDelta); + + //Setup the next video + this.refreshNextTimer(); } sync(){ @@ -99,7 +218,56 @@ module.exports = class{ } } - sendQueue(socket){ + getItemAtEpoch(epoch = new Date().getTime()){ + //Loop through scheduled items + for(let item of this.schedule){ + //If we're past or at the start time and at or before the end time + if(item[0] <= epoch && item[1].getEndTime() >= epoch){ + //return the current item + return item[1] + } + } + + //If we fell through the loop return null + return null; + } + + getLastItem(epoch = new Date().getTime()){ + //Create variable to hold the last item + let last; + + //Loop through scheduled items + for(let item of this.schedule){ + //If we've stumbled on to the next item + if(item[0] >= epoch){ + //Break the loop + break; + //If we've stumbled upon an item that is currently playing + }else if(item[1].getEndTime() >= epoch){ + //Break the loop + break; + //If we made it through this iteration without breaking the loop + } + + //Set current item to last item + last = item[1]; + } + + //If the loop has been broken or fallen through, return last. + return last; + } + + getNextItem(epoch = new Date().getTime()){ + //Iterate through the schedule + for(let item of this.schedule){ + if(item[0] >= epoch){ + //Pull the scheduled media object from the map entry array + return item[1]; + } + } + } + + sendMedia(socket){ //Create data object const data = { media: this.nowPlaying, diff --git a/src/app/channel/media/queuedMedia.js b/src/app/channel/media/queuedMedia.js index 1c562dd..4dc015d 100644 --- a/src/app/channel/media/queuedMedia.js +++ b/src/app/channel/media/queuedMedia.js @@ -40,4 +40,7 @@ module.exports = class extends media{ this.uuid = crypto.randomUUID(); } + getEndTime(){ + return this.startTime + (this.duration * 1000); + } } \ No newline at end of file diff --git a/src/app/channel/media/yanker.js b/src/app/channel/media/yanker.js deleted file mode 100644 index e8a0ae5..0000000 --- a/src/app/channel/media/yanker.js +++ /dev/null @@ -1,101 +0,0 @@ -/*Canopy - The next generation of stoner streaming software -Copyright (C) 2024-2025 Rainbownapkin and the TTN Community - -This program is free software: you can redistribute it and/or modify -it under the terms of the GNU Affero General Public License as -published by the Free Software Foundation, either version 3 of the -License, or (at your option) any later version. - -This program is distributed in the hope that it will be useful, -but WITHOUT ANY WARRANTY; without even the implied warranty of -MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -GNU Affero General Public License for more details. - -You should have received a copy of the GNU Affero General Public License -along with this program. If not, see .*/ - -//NPM Imports -const validator = require('validator');//No express here, so regular validator it is! - -//local import -const loggerUtils = require('../../../utils/loggerUtils'); -const iaUtil = require('../../../utils/media/internetArchiveUtils'); -const media = require('./media'); - -module.exports = class{ - constructor(server){ - this.server = server; - } - - defineListeners(socket){ - socket.on("yank", (data) => {this.testYank(socket, data)}); - socket.on("play", (data) => {this.testPlay(socket, data)}); - } - - async testYank(socket, data){ - try{ - console.log(await this.yankMedia(data.url)); - }catch(err){ - return loggerUtils.socketExceptionHandler(socket, err); - } - } - - async testPlay(socket, data){ - try{ - //Pull media list - const mediaList = await this.yankMedia(data.url); - //Get active channel from server/socket - const chan = this.server.activeChannels.get(socket.chan); - //Queue the first media object given - chan.queue.queueMedia(mediaList[0]); - }catch(err){ - return loggerUtils.socketExceptionHandler(socket, err); - } - } - - async yankMedia(url){ - const pullType = await this.getMediaType(url); - - if(pullType == 'ia'){ - //Create empty list to hold media objects - const mediaList = []; - //Pull metadata from IA - const mediaInfo = await iaUtil.fetchMetadata(url); - - //for every compatible and relevant file returned from IA - for(let file of mediaInfo.files){ - //Split file path by directories - const path = file.name.split('/'); - //pull filename from path - const name = path[path.length - 1]; - //Construct link from pulled info - const link = `https://archive.org/download/${mediaInfo.metadata.identifier}/${file.name}`; - - //Create new media object from file info - mediaList.push(new media(name, name, link, 'ia', Number(file.length))); - } - - //return media object list - return mediaList; - }else{ - //return null to signify a bad url - return null; - } - } - - async getMediaType(url){ - //Check if we have a valid url - if(!validator.isURL(url)){ - //If not toss the fucker out - return null; - } - - //If we have link to a resource from archive.org - if(url.match(/^https\:\/\/archive.org\//g)){ - //return internet archive code - return "ia"; - } - - return null; - } -} \ No newline at end of file diff --git a/src/utils/loggerUtils.js b/src/utils/loggerUtils.js index efa3239..a82b336 100644 --- a/src/utils/loggerUtils.js +++ b/src/utils/loggerUtils.js @@ -37,6 +37,10 @@ module.exports.exceptionHandler = function(res, err){ module.exports.errorHandler(res, err.message, "Caught Exception"); } +module.exports.socketErrorHandler = function(socket, msg, type = "Generic"){ + return socket.emit("error", {errors: [{type, msg, date: new Date()}]}); +} + module.exports.socketExceptionHandler = function(socket, err){ //If we're being verbose if(config.verbose){ @@ -45,7 +49,7 @@ module.exports.socketExceptionHandler = function(socket, err){ } //if not yell at the browser for fucking up, and tell it what it did wrong. - return socket.emit("error", {errors: [{type: "Caught Exception", msg: err.message, date: new Date()}]}); + return module.exports.socketErrorHandler(socket, err.msg, "Caught Exception"); } module.exports.socketCriticalExceptionHandler = function(socket, err){ diff --git a/src/utils/media/yanker.js b/src/utils/media/yanker.js new file mode 100644 index 0000000..790b5f2 --- /dev/null +++ b/src/utils/media/yanker.js @@ -0,0 +1,68 @@ +/*Canopy - The next generation of stoner streaming software +Copyright (C) 2024-2025 Rainbownapkin and the TTN Community + +This program is free software: you can redistribute it and/or modify +it under the terms of the GNU Affero General Public License as +published by the Free Software Foundation, either version 3 of the +License, or (at your option) any later version. + +This program is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +GNU Affero General Public License for more details. + +You should have received a copy of the GNU Affero General Public License +along with this program. If not, see .*/ + +//NPM Imports +const validator = require('validator');//No express here, so regular validator it is! + +//local import +const iaUtil = require('./internetArchiveUtils'); +const media = require('../../app/channel/media/media'); + +module.exports.yankMedia = async function(url){ + const pullType = await this.getMediaType(url); + + if(pullType == 'ia'){ + //Create empty list to hold media objects + const mediaList = []; + //Pull metadata from IA + const mediaInfo = await iaUtil.fetchMetadata(url); + + //for every compatible and relevant file returned from IA + for(let file of mediaInfo.files){ + //Split file path by directories + const path = file.name.split('/'); + //pull filename from path + const name = path[path.length - 1]; + //Construct link from pulled info + const link = `https://archive.org/download/${mediaInfo.metadata.identifier}/${file.name}`; + + //Create new media object from file info + mediaList.push(new media(name, name, link, 'ia', Number(file.length))); + } + + //return media object list + return mediaList; + }else{ + //return null to signify a bad url + return null; + } +} + +module.exports.getMediaType = async function(url){ + //Check if we have a valid url + if(!validator.isURL(url)){ + //If not toss the fucker out + return null; + } + + //If we have link to a resource from archive.org + if(url.match(/^https\:\/\/archive.org\//g)){ + //return internet archive code + return "ia"; + } + + return null; +} \ No newline at end of file diff --git a/src/views/partial/channel/mediaPanel.ejs b/src/views/partial/channel/mediaPanel.ejs index 3b50c8d..9f8ee2f 100644 --- a/src/views/partial/channel/mediaPanel.ejs +++ b/src/views/partial/channel/mediaPanel.ejs @@ -22,8 +22,8 @@ along with this program. If not, see . %> - - + + diff --git a/www/js/channel/mediaHandler.js b/www/js/channel/mediaHandler.js index f22914a..fcfbb76 100644 --- a/www/js/channel/mediaHandler.js +++ b/www/js/channel/mediaHandler.js @@ -15,11 +15,15 @@ You should have received a copy of the GNU Affero General Public License along with this program. If not, see .*/ class mediaHandler{ - constructor(client, player, media){ + constructor(client, player, media, type){ //Get parents this.client = client; this.player = player; + //Set handler type + this.type = type + + //Set last received timestamp to 0 this.lastTimestamp = 0; //Ingest media object from server @@ -122,7 +126,20 @@ class mediaHandler{ class nullHandler extends mediaHandler{ constructor(client, player){ //Call derived constructor - super(client, player, {}); + super(client, player, {}, null); + + this.defineListeners(); + } + + defineListeners(){ + //Disable right clicking + this.video.addEventListener('contextmenu', (e)=>{e.preventDefault()}); + this.video.addEventListener('loadedmetadata', this.onMetadataLoad.bind(this)); + } + + onMetadataLoad(event){ + //Resize aspect (if locked), since the video doesn't properly report it's resolution until it's been loaded + this.client.chatBox.resizeAspect(); } start(){ @@ -133,7 +150,7 @@ class nullHandler extends mediaHandler{ this.video.src = '/video/static.webm'; //Set video title manually - this.player.title.textContent = 'Channel Off Air'; + this.player.title.textContent = 'Channel Off Air'; //play the placeholder video this.video.play(); @@ -143,16 +160,18 @@ class nullHandler extends mediaHandler{ class rawFileHandler extends mediaHandler{ constructor(client, player, media){ //Call derived constructor - super(client, player, media); + super(client, player, media, 'raw'); - //Since this media type has no way to tell between the user and code seek events, we need a flag to mark them - this.selfSeek = false; + //Since this media type has no way to tell between events that originate from either the user or code + //That's what this boolean is for :P + this.selfAct = false; //Define listeners this.defineListeners(); } defineListeners(){ + this.video.addEventListener('loadedmetadata', this.onMetadataLoad.bind(this)); this.video.addEventListener('pause', this.onPause.bind(this)); this.video.addEventListener('seeking', this.onSeek.bind(this)); } @@ -180,37 +199,56 @@ class rawFileHandler extends mediaHandler{ } sync(timestamp = this.lastTimestamp){ - //Set self seek flag - this.selfSeek = true; + //Skip sync calls that won't seek so we don't pointlessly throw selfAct + if(timestamp != this.video.currentTime){ + //Set self act flag + this.selfAct = true; - //Set current video time based on timestamp received from server - this.video.currentTime = timestamp; + //Set current video time based on timestamp received from server + this.video.currentTime = timestamp; + } } reload(){ - //Throw self seek flag to make sure we don't un-sync the player - this.selfSeek = true; + //Throw self act flag to make sure we don't un-sync the player + this.selfAct = true; //Call derived reload function super.reload(); } + onMetadataLoad(event){ + //Resize aspect (if locked), since the video doesn't properly report it's resolution until it's been loaded + this.client.chatBox.resizeAspect(); + } + onPause(event){ - this.player.unlockSync(); + //If the video was paused out-side of code + if(!this.selfAct){ + this.player.unlockSync(); + } + + this.selfAct = false; } onSeek(event){ //If the video was seeked out-side of code - if(!this.selfSeek){ + if(!this.selfAct){ this.player.unlockSync(); } - //reset self seek flag - this.selfSeek = false; + //reset self act flag + this.selfAct = false; } getTimestamp(){ //Return current timestamp return this.video.currentTime; } + + end(){ + //Throw self act to prevent unlock on video end + this.selfAct = true; + super.end(); + } } \ No newline at end of file diff --git a/www/js/channel/player.js b/www/js/channel/player.js index 352b1e2..adeb85e 100644 --- a/www/js/channel/player.js +++ b/www/js/channel/player.js @@ -41,13 +41,12 @@ class player{ this.reloadIcon = document.querySelector("#media-panel-reload-icon"); //Numbers - this.syncTolerance = 1; + this.syncTolerance = 0.4; this.syncDelta = 6; //run setup functions this.setupInput(); this.defineListeners(); - this.lockSync(); } setupInput(){ @@ -97,6 +96,9 @@ class player{ } } + //Lock synchronization since everyone starts at 0, and update the UI + this.lockSync(); + //Re-size to aspect since video may now be a different size this.client.chatBox.resizeAspect(); } @@ -133,27 +135,32 @@ class player{ //Replace it with a null handler this.mediaHandler = new nullHandler(client, this); + + //Re-lock sync since we're probably gonna start new media soon anywho, and we need to update the UI anywho + this.lockSync(); } lockSync(){ - //Light up the sync icon to show that we're actively synchronized - this.syncIcon.classList.add('positive'); - //Enable syncing this.syncLock = true; - //If we have a media handler - if(this.mediaHandler != null){ + if(this.mediaHandler != null && this.mediaHandler.type != null){ + //Light up the sync icon to show that we're actively synchronized + this.syncIcon.classList.add('positive'); + //Sync to last timestamp this.mediaHandler.sync(); //Play this.mediaHandler.play(); + }else{ + //Unlight the sync icon since there is nothing to sync + this.syncIcon.classList.remove('positive'); } } unlockSync(){ - //Unlight the sync icon + //Unlight the sync icon since we're no longer actively synced this.syncIcon.classList.remove('positive'); //Disable syncing