canopy/src/app/channel/media/queue.js

658 lines
24 KiB
JavaScript

/*Canopy - The next generation of stoner streaming software
Copyright (C) 2024-2025 Rainbownapkin and the TTN Community
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU Affero General Public License as
published by the Free Software Foundation, either version 3 of the
License, or (at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU Affero General Public License for more details.
You should have received a copy of the GNU Affero General Public License
along with this program. If not, see <https://www.gnu.org/licenses/>.*/
//NPM imports
const validator = require('validator');
//Local imports
const queuedMedia = require('./queuedMedia');
const yanker = require('../../../utils/media/yanker');
const loggerUtils = require('../../../utils/loggerUtils');
const channelModel = require('../../../schemas/channel/channelSchema');
module.exports = class{
constructor(server, channel){
//Set server
this.server = server
//Set channel
this.channel = channel;
//Create map to hold currently queued media
this.schedule = new Map();
//Create variable to hold sync delta in ms
this.syncDelta = 1000;
//Create variable to hold current timestamp within the video
this.timestamp = 0;
//Create variable to hold sync timer
this.syncTimer = null;
//Create variable to hold next playing item timer
this.nextTimer = null;
//Create variable to hold currently playing media object
this.nowPlaying = null;
//create boolean to hold schedule lock
this.locked = false;
}
defineListeners(socket){
socket.on("queue", (data) => {this.queueURL(socket, data)});
socket.on("stop", (data) => {this.stopMedia(socket)});
socket.on("delete", (data) => {this.deleteMedia(socket, data)});
socket.on("move", (data) => {this.moveMedia(socket, data)});
socket.on("clear", (data) => {this.deleteRange(socket, data)});
socket.on("lock", (data) => {this.toggleLock(socket)});
}
async queueURL(socket, data){
//Get the current channel from the database
const chanDB = await channelModel.findOne({name: socket.chan});
if((!this.locked && await chanDB.permCheck(socket.user, 'scheduleMedia')) || await chanDB.permCheck(socket.user, 'scheduleAdmin')){
try{
//Set url
var url = data.url;
//If we where given a bad URL
if(!validator.isURL(url)){
//Attempt to fix the situation by encoding it
url = encodeURI(url);
//If it's still bad
if(!validator.isURL(url)){
//Bitch, moan, complain...
loggerUtils.socketErrorHandler(socket, "Bad URL!", "validation");
//and ignore it!
return;
}
}
//If the title is too long
if(!validator.isLength(data.title, {max:30})){
//Bitch, moan, complain...
loggerUtils.socketErrorHandler(socket, "Title too long!", "validation");
//and ignore it!
return;
}
//Set title
const title = validator.escape(validator.trim(data.title));
//set start
var start = data.start;
//If start time isn't an integer after the current epoch
if(start != null &&!validator.isInt(String(start), (new Date().getTime()))){
//Null out time to tell the later parts of the function to start it now
start = null;
}
//Pull media list
const mediaList = await yanker.yankMedia(url, title);
//If we didn't find any media
if(mediaList == null || mediaList.length <= 0){
//Bitch, moan, complain...
loggerUtils.socketErrorHandler(socket, "No media found!", "queue");
//and ignore it!
return;
}
//Queue the first media object given
this.queueMedia(mediaList[0], start, socket);
}catch(err){
return loggerUtils.socketExceptionHandler(socket, err);
}
}
}
async deleteRange(socket, data){
//Get the current channel from the database
const chanDB = await channelModel.findOne({name: socket.chan});
if((!this.locked && await chanDB.permCheck(socket.user, 'clearSchedule')) || await chanDB.permCheck(socket.user, 'scheduleAdmin')){
try{
//If start time isn't an integer
if(data.start != null && !validator.isInt(String(data.start))){
//Bitch, moan, complain...
loggerUtils.socketErrorHandler(socket, "Bad start date!", "queue");
//and ignore it!
return;
}
//If end time isn't an integer
if(data.end != null && !validator.isInt(String(data.end))){
//Bitch, moan, complain...
loggerUtils.socketErrorHandler(socket, "Bad end date!", "queue");
//and ignore it!
return;
}
this.removeRange(data.start, data.end, socket);
}catch(err){
return loggerUtils.socketExceptionHandler(socket, err);
}
}
}
async deleteMedia(socket, data){
//Get the current channel from the database
const chanDB = await channelModel.findOne({name: socket.chan});
if((!this.locked && await chanDB.permCheck(socket.user, 'scheduleMedia')) || await chanDB.permCheck(socket.user, 'scheduleAdmin')){
try{
//If we don't have a valid UUID
if(!validator.isUUID(data.uuid)){
//Bitch, moan, complain...
loggerUtils.socketErrorHandler(socket, "Bad UUID!", "validation");
//and ignore it!
return;
}
//Remove media by UUID
this.removeMedia(data.uuid, socket);
}catch(err){
return loggerUtils.socketExceptionHandler(socket, err);
}
}
}
async moveMedia(socket, data){
//Get the current channel from the database
const chanDB = await channelModel.findOne({name: socket.chan});
if((!this.locked && await chanDB.permCheck(socket.user, 'scheduleMedia')) || await chanDB.permCheck(socket.user, 'scheduleAdmin')){
try{
//If we don't have a valid UUID
if(!validator.isUUID(data.uuid)){
//Bitch, moan, complain...
loggerUtils.socketErrorHandler(socket, "Bad UUID!", "validation");
//and ignore it!
return;
}
//If start time isn't an integer
if(data.start != null && !validator.isInt(String(data.start))){
//Null out time to tell the later parts of the function to start it now
data.start = undefined;
}
//Move media by UUID
this.rescheduleMedia(data.uuid, data.start, socket);
}catch(err){
return loggerUtils.socketExceptionHandler(socket, err);
}
}
}
async toggleLock(socket){
//Get the current channel from the database
const chanDB = await channelModel.findOne({name: socket.chan});
//If the user is a schedule admin
if(await chanDB.permCheck(socket.user, 'scheduleAdmin')){
//Toggle the schedule lock
this.locked = !this.locked;
//Update schedule lock status for everyone in the channel
this.server.io.in(this.channel.name).emit("lock", {locked: this.locked});
}
}
//Default start time to now + half a second to give everyone time to process shit
queueMedia(inputMedia, start, socket){
//If we have an invalid time
if(start == null || start < (new Date).getTime()){
//Get last item from schedule
const lastItem = (Array.from(this.schedule)[this.schedule.size - 1]);
const now = new Date().getTime()
//if we have a last item
if(lastItem != null){
//If the last item has ended
if(lastItem[1].getEndTime() < now){
start = now + 5;
//If it hasn't started yet
}else{
//Throw it on five ms after the last item
start = lastItem[1].getEndTime() + 5;
}
}else{
//Throw it on five ms after the last item
start = now + 5;
}
}
//Create a new media queued object, set start time to now
const mediaObj = queuedMedia.fromMedia(inputMedia, start, 0);
//schedule the media
this.scheduleMedia(mediaObj, socket);
}
refreshNextTimer(){
//Grab the next item
const nextItem = this.getNextItem();
//If we have no next item
if(nextItem == null){
//Get current item
const currentItem = this.getItemAtEpoch()
//If we have a current item and it isn't currently playing
if(currentItem != null && (this.nowPlaying == null || currentItem.uuid != this.nowPlaying.uuid)){
//Start the found item at w/ a pre-calculated time stamp to reflect the given start time
this.start(currentItem, Math.round((new Date().getTime() - currentItem.startTime) / 1000) + currentItem.startTimeStamp);
}
//otherwise if we have an item
}else{
//Calculate the amount of time in ms that the next item will start in
const startsIn = nextItem.startTime - new Date().getTime();
//Clear out any item that might be up next
clearTimeout(this.nextTimer);
//Set the next timer
this.nextTimer = setTimeout(()=>{this.start(nextItem, nextItem.startTimeStamp)}, startsIn);
}
}
removeRange(start = new Date().getTime() - 60 * 1000, end = new Date().getTime(), socket){
//Find items within given range
const foundItems = this.getItemsBetweenEpochs(start, end);
//For each item
for(let item of foundItems){
//Remove media
this.removeMedia(item.uuid, socket);
}
}
rescheduleMedia(uuid, start = new Date().getTime() + 5, socket){
//Find our media, don't remove it yet since we want to do some more testing first
const media = this.getItemByUUID(uuid);
//If we got a bad request
if(media == null){
//If an originating socket was provided for this request
if(socket != null){
//Yell at the user for being an asshole
loggerUtils.socketErrorHandler(socket, "Cannot move non-existant item!", "queue");
}
//Ignore it
return;
}
//If someone is trying to re-schedule something that starts in the past
if(media.startTime < new Date().getTime()){
//If an originating socket was provided for this request
if(socket != null){
//If the item is currently playing
if(media.getEndTime() > new Date().getTime()){
//Yell at the user for being an asshole
loggerUtils.socketErrorHandler(socket, "You cannot move an actively playing video!", "queue");
//Otherwise, if it's already ended
}else{
//Yell at the user for being an asshole
loggerUtils.socketErrorHandler(socket, "You cannot alter the past!", "queue");
}
}
//Ignore it
return;
}
//Remove the media from the schedule
this.removeMedia(uuid);
//Grab the old start time for safe keeping
const oldStart = media.startTime;
//Set media time
media.startTime = start;
//Reset the start time stamp for re-calculation
media.startTimeStamp = 0;
//Attempt to schedule media at given time
//Otherwise, if it returns false for fuckup
if(!this.scheduleMedia(media, socket)){
//Reset start time
media.startTime = oldStart;
//Reset the start time stamp for re-calculation
media.startTimeStamp = 0;
//Schedule in old slot
this.scheduleMedia(media, socket, true);
}
}
removeMedia(uuid, socket){
//Get requested media
const media = this.getItemByUUID(uuid);
//If we got a bad request
if(media == null){
//If an originating socket was provided for this request
if(socket != null){
//Yell at the user for being an asshole
loggerUtils.socketErrorHandler(socket, "Cannot delete non-existant item!", "queue");
}
//Ignore it
return false;
}
//If we're currently playing the requested item.
if(this.nowPlaying != null && this.nowPlaying.uuid == uuid){
//End playback
this.end();
}
//Take the item out of the schedule
this.schedule.delete(media.startTime);
//Refresh next timer
this.refreshNextTimer();
//Broadcast the channel queue
this.broadcastQueue();
//return found media in-case our calling function needs it :P
return media;
}
stopMedia(socket){
//If we're not currently playing anything
if(this.nowPlaying == null){
//If an originating socket was provided for this request
if(socket != null){
//Yell at the user for being an asshole
loggerUtils.socketErrorHandler(socket, "No media playing!", "queue");
}
//Ignore it
return false;
}
//Stop playing
const stoppedMedia = this.nowPlaying;
//End the media
this.end();
//Get difference between current time and start time and set as early end
stoppedMedia.earlyEnd = (new Date().getTime() - stoppedMedia.startTime) / 1000;
//Broadcast the channel queue
this.broadcastQueue();
}
scheduleMedia(mediaObj, socket, force = false){
/* This is a fun method and I think it deserves it's own little explination...
Since we're working with a time based schedule, using start epochs as keys for our iterable seemed the best option
I don't want to store everything in a sparse array because that *feels* icky, and would probably be a pain in the ass.
Maps seem like a good choice, if it wheren't for the issue of keeping them ordered...
That's where this comes in. You see if we temporarily store it in a sparse array and convert into a map,
we can quickly and easily create a properly sorted schedule map that, out side of adding items, behaves normally.
Also a note on preformance:
While .forEach ONLY runs through populated items in sparse arrays, many JS implementations run through them in the background,
simply skipping them before executing the provided function. Looping through object.keys(arr), however, avoids this entirely,
since it ONLY loops through defiened items within the array. No skipped empties for your runtime to worry about.
Even more preformance benefits can be had by using a real for loop on the arrays keys, skipping the overhead of forEach entirely.
This might seem gross but it completely avoids the computational workload of a sorting algo, especially when you consider
that, no matter what, re-ordering the schedule map would've required us to iterate through and convert it to an array and back anyways...
Also it looks like due to implementation limitations, epochs stored as MS are too large for array elements, so we store them there as seconds.
This also means that our current implementation will break exactly on unix epoch 4294967295 (Feb 7, 2106 6:28:15 AM UTC)
Hopefully javascript arrays will allow for larger lengths by then. If not blame the W3C :P
If for some reason they haven't and we're not dead, we could probably implement an object that wraps a 2d array and set/gets it using modulo/devision/multiplication
Further Reading:
https://stackoverflow.com/questions/59480871/foreach-vs-object-keys-foreach-performance-on-sparse-arrays
https://community.appsmith.com/content/blog/dark-side-foreach-why-you-should-think-twice-using-it
*/
//If someone is trying to schedule something that starts and ends in the past
if((mediaObj.getEndTime() < new Date().getTime()) && !force){
//If an originating socket was provided for this request
if(socket != null){
//Yell at the user for being an asshole
loggerUtils.socketErrorHandler(socket, "You cannot alter the past!", "queue");
}
return false;
}
//If the item has already started and it's not being forced
if((mediaObj.startTime < new Date().getTime())){
//Set time stamp to existing timestamp plus the difference between the orginal start-date and now
mediaObj.startTimeStamp = mediaObj.startTimeStamp + ((new Date().getTime() - mediaObj.startTime) / 1000)
//Start the item now
mediaObj.startTime = new Date().getTime() + 5;
}
//If there's already something queued right now
if(this.getItemAtEpoch(mediaObj.startTime) != null || this.getItemAtEpoch(mediaObj.getEndTime())){
//If an originating socket was provided for this request
if(socket != null){
//Yell at the user for being an asshole
loggerUtils.socketErrorHandler(socket, "This time slot has already been taken in the queue!", "queue");
}
//Ignore it
return false;
}
//Create an empty temp array to sparsley populate with our schedule
const tempSchedule = [];
//Create new map to replace our current schedule map
const newSchedule = new Map();
//For every item that's already been scheduled
for(let item of this.schedule){
//add it to the slot corresponding to it's start epoch in seconds
tempSchedule[Math.round(item[0] / 1000)] = item[1];
}
//Inject the media object into the slot corresponding to it's epoch in the temp schedule array
tempSchedule[Math.round(mediaObj.startTime / 1000)] = mediaObj;
//For every populated key in our array
for(let startTime of Object.keys(tempSchedule)){
//Add item to replacement schedule map
newSchedule.set(tempSchedule[startTime].startTime, tempSchedule[startTime]);
}
//Replace the existing schedule map with our new one
this.schedule = newSchedule;
//Broadcast the channel queue
this.broadcastQueue();
//Refresh the next timer to ensure whatever comes on next is right
this.refreshNextTimer();
//return media object for use
return mediaObj;
}
start(mediaObj, timestamp = mediaObj.startTimeStamp){
//Silently end the media
this.end(true);
//reset current timestamp
this.timestamp = timestamp;
//Set current playing media
this.nowPlaying = mediaObj;
//Send play signal out to the channel
this.sendMedia();
//Kick off the sync timer
this.syncTimer = setTimeout(this.sync.bind(this), this.syncDelta);
//Setup the next video
this.refreshNextTimer();
//return media object for use
return mediaObj;
}
sync(){
//Send sync signal out to the channel
this.server.io.in(this.channel.name).emit("sync", {timestamp: this.timestamp});
//If the media has over a second to go
if((this.timestamp + 1) < this.nowPlaying.duration){
//Increment the time stamp
this.timestamp += (this.syncDelta / 1000);
//Call the sync function in another second
this.syncTimer = setTimeout(this.sync.bind(this), this.syncDelta);
}else{
//Get leftover video length in ms
const leftover = (this.nowPlaying.duration - this.timestamp) * 1000;
//Call the end function once the video is over
this.syncTimer = setTimeout(this.end.bind(this), leftover);
}
}
end(quiet = false){
//Call off any existing sync timer
clearTimeout(this.syncTimer);
//Clear out the sync timer
this.syncTimer = null;
//Clear now playing
this.nowPlaying = null;
//Clear timestamp
this.timestamp = 0;
//If we're not being quiet
if(!quiet){
//Tell everyone of the end-times
this.server.io.in(this.channel.name).emit('end', {});
}
}
getItemsBetweenEpochs(start, end){
//Create an empty array to hold found items
const foundItems = [];
//Loop through scheduled items
for(let item of this.schedule){
//If the item starts after our start date and before our end date
if(item[0] >= start && item[0] <= end ){
//Add the current item to the list
foundItems.push(item[1]);
}
}
//Return any found items
return foundItems;
}
getItemAtEpoch(epoch = new Date().getTime()){
//Loop through scheduled items
for(let item of this.schedule){
//If we're past or at the start time and at or before the end time
if(item[0] <= epoch && item[1].getEndTime() >= epoch){
//return the current item
return item[1]
}
}
//If we fell through the loop return null
return null;
}
getLastItem(epoch = new Date().getTime()){
//Create variable to hold the last item
let last;
//Loop through scheduled items
for(let item of this.schedule){
//If we've stumbled on to the next item
if(item[0] >= epoch){
//Break the loop
break;
//If we've stumbled upon an item that is currently playing
}else if(item[1].getEndTime() >= epoch){
//Break the loop
break;
//If we made it through this iteration without breaking the loop
}
//Set current item to last item
last = item[1];
}
//If the loop has been broken or fallen through, return last.
return last;
}
getNextItem(epoch = new Date().getTime()){
//Iterate through the schedule
for(let item of this.schedule){
if(item[0] >= epoch){
//Pull the scheduled media object from the map entry array
return item[1];
}
}
}
getItemByUUID(uuid){
//Iterate through the schedule
for(let item of this.schedule){
//If the uuid matches
if(item[1].uuid == uuid){
//return the found item
return item[1];
}
}
}
sendMedia(socket){
//Create data object
const data = {
media: this.nowPlaying,
timestamp: this.timestamp
}
//If a socket is specified
if(socket != null){
//Send data out to specified socket
socket.emit("start", data);
//Otherwise
}else{
//Send that shit out to the entire channel
this.server.io.in(this.channel.name).emit("start", data);
}
}
broadcastQueue(){
this.server.io.in(this.channel.name).emit('queue',{queue: Array.from(this.schedule)})
}
}