import { query, mutation, internalMutation } from "./_generated/server"; import { v } from "convex/values"; import { internal } from "./_generated/api"; import { getPublicStorageUrl } from "./storageUrl"; import { getRolesForUser } from "./roles"; async function removeUserVoiceStates(ctx: any, userId: any) { const existing = await ctx.db .query("voiceStates") .withIndex("by_user", (q: any) => q.eq("userId", userId)) .collect(); for (const vs of existing) { await ctx.db.delete(vs._id); } } export const join = mutation({ args: { channelId: v.id("channels"), userId: v.id("userProfiles"), username: v.string(), isMuted: v.boolean(), isDeafened: v.boolean(), }, returns: v.null(), handler: async (ctx, args) => { await removeUserVoiceStates(ctx, args.userId); await ctx.db.insert("voiceStates", { channelId: args.channelId, userId: args.userId, username: args.username, isMuted: args.isMuted, isDeafened: args.isDeafened, isScreenSharing: false, isServerMuted: false, lastHeartbeat: Date.now(), }); // Schedule stale cleanup to run in 90 seconds await ctx.scheduler.runAfter(90000, internal.voiceState.cleanStaleStates, {}); return null; }, }); export const leave = mutation({ args: { userId: v.id("userProfiles"), }, returns: v.null(), handler: async (ctx, args) => { await removeUserVoiceStates(ctx, args.userId); return null; }, }); export const updateState = mutation({ args: { userId: v.id("userProfiles"), isMuted: v.optional(v.boolean()), isDeafened: v.optional(v.boolean()), isScreenSharing: v.optional(v.boolean()), }, returns: v.null(), handler: async (ctx, args) => { const existing = await ctx.db .query("voiceStates") .withIndex("by_user", (q) => q.eq("userId", args.userId)) .first(); if (existing) { const { userId: _, ...updates } = args; const filtered = Object.fromEntries( Object.entries(updates).filter(([, val]) => val !== undefined) ); await ctx.db.patch(existing._id, filtered); // When a user stops screen sharing, clear all viewers watching their stream if (args.isScreenSharing === false) { const allStates = await ctx.db.query("voiceStates").collect(); for (const s of allStates) { if (s.watchingStream === args.userId) { await ctx.db.patch(s._id, { watchingStream: undefined }); } } } } return null; }, }); export const serverMute = mutation({ args: { actorUserId: v.id("userProfiles"), targetUserId: v.id("userProfiles"), isServerMuted: v.boolean(), }, returns: v.null(), handler: async (ctx, args) => { const roles = await getRolesForUser(ctx, args.actorUserId); const canMute = roles.some( (role) => (role.permissions as Record)?.["mute_members"] ); if (!canMute) { throw new Error("You don't have permission to server mute members"); } const existing = await ctx.db .query("voiceStates") .withIndex("by_user", (q: any) => q.eq("userId", args.targetUserId)) .first(); if (!existing) throw new Error("Target user is not in a voice channel"); await ctx.db.patch(existing._id, { isServerMuted: args.isServerMuted }); return null; }, }); export const setWatchingStream = mutation({ args: { userId: v.id("userProfiles"), watchingStream: v.optional(v.id("userProfiles")), }, returns: v.null(), handler: async (ctx, args) => { const existing = await ctx.db .query("voiceStates") .withIndex("by_user", (q) => q.eq("userId", args.userId)) .first(); if (existing) { await ctx.db.patch(existing._id, { watchingStream: args.watchingStream ?? undefined, }); } return null; }, }); export const getAll = query({ args: {}, returns: v.any(), handler: async (ctx) => { const states = await ctx.db.query("voiceStates").collect(); const grouped: Record> = {}; for (const s of states) { const user = await ctx.db.get(s.userId); let avatarUrl: string | null = null; if (user?.avatarStorageId) { avatarUrl = await getPublicStorageUrl(ctx, user.avatarStorageId); } let joinSoundUrl: string | null = null; if (user?.joinSoundStorageId) { joinSoundUrl = await getPublicStorageUrl(ctx, user.joinSoundStorageId); } (grouped[s.channelId] ??= []).push({ userId: s.userId, username: s.username, displayName: user?.displayName || null, isMuted: s.isMuted, isDeafened: s.isDeafened, isScreenSharing: s.isScreenSharing, isServerMuted: s.isServerMuted, avatarUrl, joinSoundUrl, watchingStream: s.watchingStream ?? null, }); } return grouped; }, }); export const afkMove = mutation({ args: { userId: v.id("userProfiles"), afkChannelId: v.id("channels"), }, returns: v.null(), handler: async (ctx, args) => { // Validate afkChannelId matches server settings const settings = await ctx.db.query("serverSettings").first(); if (!settings || settings.afkChannelId !== args.afkChannelId) { throw new Error("Invalid AFK channel"); } // Get current voice state const currentState = await ctx.db .query("voiceStates") .withIndex("by_user", (q: any) => q.eq("userId", args.userId)) .first(); // No-op if not in voice or already in AFK channel if (!currentState || currentState.channelId === args.afkChannelId) return null; // Move to AFK channel: delete old state, insert new one muted await ctx.db.delete(currentState._id); await ctx.db.insert("voiceStates", { channelId: args.afkChannelId, userId: args.userId, username: currentState.username, isMuted: true, isDeafened: currentState.isDeafened, isScreenSharing: false, isServerMuted: currentState.isServerMuted, lastHeartbeat: Date.now(), }); // Clear viewers watching the moved user's stream (screen sharing stops on AFK move) const allStates = await ctx.db.query("voiceStates").collect(); for (const s of allStates) { if (s.watchingStream === args.userId) { await ctx.db.patch(s._id, { watchingStream: undefined }); } } return null; }, }); export const disconnectUser = mutation({ args: { actorUserId: v.id("userProfiles"), targetUserId: v.id("userProfiles"), }, returns: v.null(), handler: async (ctx, args) => { const roles = await getRolesForUser(ctx, args.actorUserId); const canMove = roles.some( (role) => (role.permissions as Record)?.["move_members"] ); if (!canMove) { throw new Error("You don't have permission to disconnect members"); } // Clear viewers watching the target user's stream const allStates = await ctx.db.query("voiceStates").collect(); for (const s of allStates) { if (s.watchingStream === args.targetUserId) { await ctx.db.patch(s._id, { watchingStream: undefined }); } } await removeUserVoiceStates(ctx, args.targetUserId); return null; }, }); export const heartbeat = mutation({ args: { userId: v.id("userProfiles"), }, returns: v.null(), handler: async (ctx, args) => { const existing = await ctx.db .query("voiceStates") .withIndex("by_user", (q: any) => q.eq("userId", args.userId)) .first(); if (existing) { await ctx.db.patch(existing._id, { lastHeartbeat: Date.now() }); } return null; }, }); export const cleanStaleStates = internalMutation({ args: {}, returns: v.null(), handler: async (ctx) => { const states = await ctx.db.query("voiceStates").collect(); const staleThreshold = Date.now() - 90_000; // 90 seconds let hasActiveStates = false; for (const s of states) { if (s.lastHeartbeat && s.lastHeartbeat < staleThreshold) { // Clear viewers watching this user's stream for (const other of states) { if (other.watchingStream === s.userId && other._id !== s._id) { await ctx.db.patch(other._id, { watchingStream: undefined }); } } await ctx.db.delete(s._id); } else { hasActiveStates = true; } } // Re-schedule if there are still active voice states if (hasActiveStates) { await ctx.scheduler.runAfter(90000, internal.voiceState.cleanStaleStates, {}); } return null; }, }); export const moveUser = mutation({ args: { actorUserId: v.id("userProfiles"), targetUserId: v.id("userProfiles"), targetChannelId: v.id("channels"), }, returns: v.null(), handler: async (ctx, args) => { // Check actor has move_members permission const roles = await getRolesForUser(ctx, args.actorUserId); const canMove = roles.some( (role) => (role.permissions as Record)?.["move_members"] ); if (!canMove) { throw new Error("You don't have permission to move members"); } // Validate target channel exists and is voice const targetChannel = await ctx.db.get(args.targetChannelId); if (!targetChannel) throw new Error("Target channel not found"); if (targetChannel.type !== "voice") throw new Error("Target channel is not a voice channel"); // Get target user's current voice state const currentState = await ctx.db .query("voiceStates") .withIndex("by_user", (q: any) => q.eq("userId", args.targetUserId)) .first(); if (!currentState) throw new Error("Target user is not in a voice channel"); // No-op if already in the target channel if (currentState.channelId === args.targetChannelId) return null; // Delete old voice state and insert new one preserving mute/deaf/screenshare await ctx.db.delete(currentState._id); await ctx.db.insert("voiceStates", { channelId: args.targetChannelId, userId: args.targetUserId, username: currentState.username, isMuted: currentState.isMuted, isDeafened: currentState.isDeafened, isScreenSharing: currentState.isScreenSharing, isServerMuted: currentState.isServerMuted, lastHeartbeat: Date.now(), }); return null; }, });