Skip to main content

One post tagged with "concurrency"

View All Tags

Managing Concurrent Financial Operations Over WebSockets with RBAC

· 5 min read
Backend Engineering
Backend Systems & Architecture

Building real-time applications that handle financial transactions requires careful orchestration of user permissions, concurrent operations, and budget management. In this post, we'll explore architectural patterns for implementing secure, concurrent financial operations over WebSocket connections while respecting role-based access control.

The Engineering Challenge

When users perform real-time operations that consume resources or credits, you need a system that can:

  • Process operations concurrently while maintaining data consistency
  • Enforce permissions before allowing resource consumption
  • Support hierarchical budgets (organization → project → user)
  • Handle race conditions without double-spending
  • Provide real-time feedback to connected clients

High-Level Architecture

A robust solution combines WebSocket messaging with transactional databases and permission systems:

graph TD
A[Client Operation] --> B[WebSocket Handler]
B --> C[Permission Validation]
C --> D[Resource Allocation Logic]
D --> E[Budget Hierarchy Check]
E --> F[Fallback Budget Source]
F --> G[Atomic Transaction]
G --> H[Real-time Update Broadcast]
H --> I[Client Acknowledgment]

Permission-Based Resource Management

Before any resource consumption, validate user permissions within the operational context:

async function validateAndProcessOperation(connection, resourceAmount, operationDetails, idempotencyKey) {
const permissionSystem = require('./permissions');
const contextId = connection.sessionContext?.contextId || null;
const organizationId = connection.sessionContext?.organizationId || null;

// Verify user has operational permissions in this context
const hasPermission = await permissionSystem.checkPermission(
connection.user.id,
'resource:consume',
organizationId,
contextId
);

if (!hasPermission) {
connection.send(JSON.stringify({
type: 'error',
message: 'Permission denied: Insufficient privileges for this operation'
}));
return { success: false, error: 'Permission denied' };
}

// Proceed with resource allocation...
}

Hierarchical Budget Management

Implement a cascading budget system where resources flow from organization → project → individual operations:

// Smart resource allocation with fallback hierarchy
if (contextId) {
const contextBudget = await budgetService.getContextBudget(contextId);

if (contextBudget >= resourceAmount) {
console.log(`Using context budget (${contextBudget} available)`);

const result = await budgetService.deductFromContext(
contextId,
resourceAmount,
connection.user.id,
operationDetails,
idempotencyKey,
onSuccessCallback
);

return {
...result,
source: 'context_budget',
contextId: contextId
};
} else {
console.log(`Context budget insufficient, falling back to organization pool`);
}
}

// Fall back to organization-level resource pool
const result = await budgetService.deductFromOrganization(
organizationId,
resourceAmount,
connection.user.id,
operationDetails,
idempotencyKey
);

Concurrency Control and Race Prevention

Prevent double-spending and maintain consistency using database-level controls:

async function performAtomicResourceDeduction(contextId, amount, userId, details, idempotencyKey, callback) {
const client = await this.pool.connect();
try {
await client.query('BEGIN');

// Prevent duplicate operations using idempotency
if (idempotencyKey) {
const duplicateCheck = await client.query(
'SELECT id FROM operation_ledger WHERE idempotency_key = $1',
[idempotencyKey]
);

if (duplicateCheck.rows.length > 0) {
await client.query('ROLLBACK');
return { success: true, isDuplicate: true };
}
}

// Acquire exclusive lock on budget row
const budgetResult = await client.query(
'SELECT available_budget FROM contexts WHERE id = $1 FOR UPDATE',
[contextId]
);

const currentBudget = budgetResult.rows[0].available_budget;
if (currentBudget < amount) {
await client.query('ROLLBACK');
return { success: false, error: 'Insufficient budget' };
}

// Atomically update budget and create audit trail
await client.query(
'UPDATE contexts SET available_budget = available_budget - $1 WHERE id = $2',
[amount, contextId]
);

await client.query(
'INSERT INTO operation_ledger (context_id, user_id, amount, operation_type, details, idempotency_key) VALUES ($1, $2, $3, $4, $5, $6)',
[contextId, userId, -amount, 'resource_consumption', details.description, idempotencyKey]
);

await client.query('COMMIT');

// Execute post-commit actions
if (callback) {
await callback({
newBudget: currentBudget - amount,
amountProcessed: amount,
source: 'context_budget'
});
}

return { success: true, newBudget: currentBudget - amount };

} catch (error) {
await client.query('ROLLBACK');
throw error;
} finally {
client.release();
}
}

Real-time State Synchronization

After successful operations, broadcast updates to maintain consistency across all client sessions:

const onSuccessCallback = async (result) => {
// Update current WebSocket connection
if (connection.readyState === WebSocket.OPEN) {
if (result.source === 'context_budget') {
connection.send(JSON.stringify({
type: 'budget_update',
contextId: contextId,
newBudget: result.newBudget,
amountProcessed: result.amountProcessed
}));
} else {
connection.send(JSON.stringify({
type: 'organization_budget_update',
newBalance: result.newBalance,
amountProcessed: result.amountProcessed
}));
}
}

// Broadcast to all sessions for this user
broadcastToUserSessions(connection.user.id, {
type: 'budget_sync',
newBalance: result.newBalance
});
};

Performance Optimizations

Several patterns ensure optimal performance under load:

  1. Connection Pooling for database connections
  2. Permission Caching with time-based invalidation
  3. Batch Processing for multiple concurrent operations
  4. Optimistic Locking to reduce contention
// Cached permission checking
const permissionCache = new Map();
const PERMISSION_CACHE_TTL = 300000; // 5 minutes

async function getCachedPermission(userId, permission, organizationId, contextId) {
const cacheKey = `${userId}:${permission}:${organizationId}:${contextId}`;
const cached = permissionCache.get(cacheKey);

if (cached && (Date.now() - cached.timestamp) < PERMISSION_CACHE_TTL) {
return cached.hasPermission;
}

const hasPermission = await permissionSystem.checkPermission(
userId, permission, organizationId, contextId
);

permissionCache.set(cacheKey, {
hasPermission,
timestamp: Date.now()
});

return hasPermission;
}

Testing Concurrent Operations

Comprehensive testing ensures system reliability under concurrent load:

// Concurrent operation stress test
describe('Concurrent Resource Operations', () => {
it('should handle multiple simultaneous operations without conflicts', async () => {
const operations = [];

// Simulate 10 concurrent resource consumption requests
for (let i = 0; i < 10; i++) {
operations.push(
budgetService.deductFromContext(
contextId,
10,
userId,
{ description: `Operation ${i}` },
`test-${i}-${Date.now()}`
)
);
}

const results = await Promise.all(operations);

// Verify all operations succeeded and budget is consistent
const successfulOps = results.filter(r => r.success);
expect(successfulOps.length).toBe(10);

const finalBudget = await budgetService.getContextBudget(contextId);
expect(finalBudget).toBe(initialBudget - 100);
});
});

Architectural Principles

Key lessons for building concurrent financial systems:

  1. Atomic Operations - Use database transactions for financial consistency
  2. Idempotency - Prevent duplicate operations in distributed systems
  3. Permission Caching - Balance security with performance requirements
  4. Callback Patterns - Decouple transaction logic from side effects
  5. Connection Management - Pool connections for WebSocket scalability

This architecture successfully handles thousands of concurrent operations while maintaining financial accuracy and respecting user permissions. The foundation is combining ACID database properties with real-time WebSocket communication and robust permission systems.


Explore more architectural patterns in our posts on subscription-based feature gating and system resilience.