Skip to main content

Workers

Location: apps/backend/src/workers/ The backend runs four BullMQ workers that consume jobs produced by the indexer. All workers share the same ioredis connection (src/config/redis.ts) and are registered by importing workers/index.ts at server startup.

Worker Summary

WorkerQueueTriggerDB Operation
trade.worker.tsTradeProcessingQueueTradeExecuted on-chain eventprisma.trade.create()
snapshot.worker.tsSnapshotCalculationQueueAfter any tradeprisma.vault.update() — ROI + drawdown
vault.worker.tsVaultDeployedQueueVaultDeployed on-chain eventprisma.user.upsert() + prisma.vault.create()
follower.worker.tsFollowerEventQueueFollowerSubscribed / FollowerUnsubscribedprisma.followerSubscription.upsert()

TradeWorker

Queue: TradeProcessingQueue
Concurrency: 5

Job Payload (TradeJobPayload)

interface TradeJobPayload {
    txHash: string;        // On-chain transaction hash (unique)
    vaultAddress: string;  // Vault contract address
    assetIn: string;       // Token sold (ERC-20 address)
    assetOut: string;      // Token received (ERC-20 address)
    amountIn: string;      // Amount sold (BigInt string)
    amountOut: string;     // Amount received (BigInt string)
    isProfit: boolean;     // Profitable close flag
    pnlAmount: string;     // PnL amount (BigInt string)
    timestamp: number;     // Block timestamp (UNIX seconds)
}

Processing Logic

  1. Resolves vault.id from vaultAddress via prisma.vault.findUnique({ where: { contractAddress } })
  2. Throws if vault is not in DB — lets BullMQ retry (indexer may have a race condition with VaultDeployedQueue)
  3. Calls prisma.trade.create({ data: { txHash, vaultId, ... } })
  4. Logs success

SnapshotWorker

Queue: SnapshotCalculationQueue

Job Payload

{ vaultAddress: string }

Processing Logic

Triggered after every trade. Recalculates roi and drawdown for the vault and updates prisma.vault.update().

VaultWorker

Queue: VaultDeployedQueue

Job Payload (VaultDeployedPayload)

interface VaultDeployedPayload {
    vaultAddress: string;  // New vault proxy address
    leader: string;        // Leader wallet address
    name: string;          // Vault name (read on-chain by indexer)
    baseAsset: string;     // Base asset address (read on-chain by indexer)
    txHash: string;        // Deployment transaction hash
}

Processing Logic

  1. prisma.user.upsert({ where: { walletAddress: leader } }) — creates the leader as a User if they don’t exist yet
  2. prisma.vault.create({ data: { contractAddress, name, baseAsset, leaderId, status: 'ACTIVE', tvl: '0', roi: 0, drawdown: 0 } })
  3. Logs success

FollowerWorker

Queue: FollowerEventQueue

Job Payload (FollowerPayload)

interface FollowerPayload {
    vaultAddress: string;     // Vault contract address
    followerAddress: string;  // Follower wallet address
    amount: string;           // Deposit or withdrawal amount (BigInt string)
    action: 'DEPOSIT' | 'WITHDRAW';
}

Processing Logic

DEPOSIT:
  1. prisma.user.upsert({ where: { walletAddress: followerAddress } })
  2. If FollowerSubscription exists for (userId, vaultId): adds amount to depositedAmount via BigInt arithmetic
  3. If not: creates new FollowerSubscription
WITHDRAW:
  1. Finds existing FollowerSubscription
  2. Subtracts amount from depositedAmount
  3. If remainingBalance <= 0: deletes the subscription record
  4. Otherwise: updates depositedAmount with the remaining balance

Error Handling

All workers throw on failure, causing BullMQ to mark the job as failed and retry with exponential backoff. Failed jobs persist in Redis and can be inspected via Bull Board or any BullMQ-compatible dashboard.
All workers throw errors on failure, which causes BullMQ to mark the job as failed and retry according to its default backoff policy.
// All workers follow this pattern
worker.on('failed', (job, err) => {
    console.error(`[Worker] Job ${job?.id} failed: ${err.message}`);
});
Failed jobs remain in Redis and can be inspected via Bull Board or the BullMQ dashboard of your choice.