237 lines
6.6 KiB
TypeScript
237 lines
6.6 KiB
TypeScript
import {fetchMetadata} from "./services/metadata";
|
|
import {DEFAULT_LUCIDA_OPTIONS, DownloadResult, LucidaOptions, ProcessingQueue, Queue, QueueItem} from "./types/queue";
|
|
import Lucida from "./services/lucida";
|
|
import * as fs from "node:fs";
|
|
|
|
const TIMEOUT: number = 120000;
|
|
const RETRIES: number = 5;
|
|
|
|
const STATE_FILE: string = '/data/state.json';
|
|
|
|
class QueueManager {
|
|
private readonly queue: Queue;
|
|
private readonly processing: ProcessingQueue;
|
|
private readonly history: Queue;
|
|
|
|
private lucida: Lucida | null;
|
|
private lucidaOptions: LucidaOptions;
|
|
|
|
public constructor() {
|
|
this.queue = [];
|
|
this.processing = [];
|
|
this.history = [];
|
|
|
|
this.lucida = null;
|
|
this.lucidaOptions = DEFAULT_LUCIDA_OPTIONS;
|
|
|
|
this.loadState();
|
|
}
|
|
|
|
private saveState(): void {
|
|
const state = {
|
|
queue: this.processing.map(pi => pi.item).concat(this.queue),
|
|
history: this.history
|
|
};
|
|
fs.writeFileSync(STATE_FILE, JSON.stringify(state, null, 4));
|
|
}
|
|
|
|
private loadState(): void {
|
|
if (!fs.existsSync(STATE_FILE)) {
|
|
return;
|
|
}
|
|
|
|
const state = JSON.parse(fs.readFileSync(STATE_FILE, 'utf8'));
|
|
|
|
console.log('QueueManager: Restoring state:', state);
|
|
|
|
this.queue.push(...state.queue);
|
|
|
|
state.history.forEach((item: QueueItem) => {
|
|
if (!this.queue.some(q => q.id === item.id)) {
|
|
if (item.result?.success) {
|
|
// Add successful items to the history
|
|
this.history.push(item);
|
|
} else {
|
|
// Add failed items to the queue
|
|
this.queue.push(item);
|
|
}
|
|
}
|
|
});
|
|
|
|
this.processQueue();
|
|
}
|
|
|
|
public setLucidaOptions(options: LucidaOptions): void {
|
|
console.log('QueueManager: Setting Lucida options:', options);
|
|
this.lucidaOptions = options;
|
|
}
|
|
|
|
public getQueue(): Queue {
|
|
return this.queue;
|
|
}
|
|
|
|
public getProcessing(): ProcessingQueue {
|
|
return this.processing;
|
|
}
|
|
|
|
public getHistory(): Queue {
|
|
return this.history;
|
|
}
|
|
|
|
public get(index: number): QueueItem {
|
|
if (index < 0 || index >= this.queue.length) {
|
|
throw new Error('Index out of bounds');
|
|
}
|
|
|
|
return this.queue[index];
|
|
}
|
|
|
|
public async add(song: string): Promise<QueueItem> {
|
|
// Check if the URL is already in the queue.
|
|
if (this.queue.some(q => q.id === song)) {
|
|
throw new Error('Song already in queue');
|
|
}
|
|
|
|
const item: QueueItem = {
|
|
id: song,
|
|
song: await fetchMetadata(song)
|
|
};
|
|
this.queue.push(item);
|
|
this.saveState();
|
|
this.processQueue();
|
|
|
|
return item;
|
|
}
|
|
|
|
public async insertAt(song: string, index: number): Promise<QueueItem> {
|
|
// Check if the URL is already in the queue.
|
|
if (this.queue.some(q => q.id === song)) {
|
|
throw new Error('Song already in queue');
|
|
}
|
|
|
|
const item: QueueItem = {
|
|
id: song,
|
|
song: await fetchMetadata(song)
|
|
};
|
|
this.queue.splice(index, 0, item);
|
|
this.saveState();
|
|
this.processQueue();
|
|
|
|
return item;
|
|
}
|
|
|
|
public async remove(song: string): Promise<QueueItem> {
|
|
const index: number = this.queue.findIndex(q => q.id === song);
|
|
if (index === -1) {
|
|
throw new Error('Song not found');
|
|
}
|
|
|
|
const item: QueueItem = this.queue[index];
|
|
this.queue.splice(index, 1);
|
|
this.saveState();
|
|
return item;
|
|
}
|
|
|
|
public clear(): void {
|
|
this.queue.length = 0;
|
|
this.saveState();
|
|
}
|
|
|
|
public move(song: string, to: number): QueueItem {
|
|
const index: number = this.queue.findIndex(q => q.id === song);
|
|
if (index === -1) {
|
|
throw new Error('Song not found');
|
|
}
|
|
|
|
const item: QueueItem = this.queue.splice(index, 1)[0];
|
|
this.queue.splice(to, 0, item);
|
|
this.saveState();
|
|
return item;
|
|
}
|
|
|
|
public retry(song: string): QueueItem {
|
|
const index: number = this.history.findIndex(q => q.id === song);
|
|
if (index === -1) {
|
|
throw new Error('Song not found');
|
|
}
|
|
|
|
const item: QueueItem = this.history.splice(index, 1)[0];
|
|
item.retries = 0;
|
|
this.queue.push(item);
|
|
this.saveState();
|
|
this.processQueue();
|
|
|
|
return item;
|
|
}
|
|
|
|
public async processQueue(): Promise<void> {
|
|
if (this.lucida !== null) {
|
|
return;
|
|
}
|
|
|
|
this.lucida = new Lucida(this.lucidaOptions);
|
|
await this.lucida.construct();
|
|
|
|
while (0 < this.queue.length) {
|
|
const item: QueueItem | undefined = this.queue.shift();
|
|
if (!item) {
|
|
console.log('QueueManager: Detected undefined item');
|
|
continue;
|
|
}
|
|
|
|
const current: {item: QueueItem, status: string} = {
|
|
item: item,
|
|
status: 'Starting'
|
|
}
|
|
this.processing.push(current);
|
|
|
|
try {
|
|
const timeout: number = (item.timeout ?? TIMEOUT) * item.song.trackCount;
|
|
const result: DownloadResult = await this.lucida.download(item.song.url, '/tmp', timeout, current);
|
|
|
|
if (!result.success && (item.retries ?? 0) + 1 < RETRIES) {
|
|
item.retries = (item.retries ?? 0) + 1;
|
|
item.timeout = (item.timeout ?? TIMEOUT) * 2;
|
|
item.result = result;
|
|
this.queue.push(item);
|
|
this.saveState();
|
|
} else {
|
|
item.result = result;
|
|
this.history.push(item);
|
|
this.saveState();
|
|
}
|
|
} catch (err) {
|
|
item.result = {
|
|
success: false,
|
|
error: err instanceof Error ? err.message : 'Unknown error'
|
|
};
|
|
this.history.push(item);
|
|
this.saveState();
|
|
}
|
|
|
|
this.processing.splice(this.processing.indexOf(current), 1);
|
|
}
|
|
|
|
await this.lucida.destruct();
|
|
this.lucida = null;
|
|
}
|
|
|
|
public async forceStop(): Promise<void> {
|
|
this.processing.forEach(item => {
|
|
item.item.result = {
|
|
success: false,
|
|
error: 'Forced stop'
|
|
};
|
|
this.history.push(item.item);
|
|
})
|
|
|
|
if (this.lucida === null) {
|
|
return;
|
|
}
|
|
|
|
await this.lucida.destruct('Forced stop');
|
|
this.lucida = null;
|
|
}
|
|
}
|
|
|
|
export default new QueueManager();
|