Skip to content

Microservices Communication and Integration Patterns

Overview

This document describes how Dhanman's microservices communicate with each other, the patterns used for inter-service communication, and the data flow across bounded contexts. Understanding these patterns is crucial for maintaining system consistency and reliability.


Communication Patterns

1. Synchronous Communication (HTTP/REST)

Used for real-time operations that require immediate responses.

When to Use: - User-facing operations requiring immediate feedback - Read operations that need fresh data - Operations where eventual consistency is not acceptable - External API integrations

Example Flow: Invoice Creation with Customer Validation

┌─────────────┐      HTTP GET       ┌─────────────┐
│   Frontend  │─────────────────────▶│    Sales    │
│   (React)   │                      │   Service   │
└─────────────┘                      └──────┬──────┘
                                            │
                                            │ HTTP GET (validate)
                                            │
                                     ┌──────▼──────┐
                                     │   Common    │
                                     │   Service   │
                                     │ (Customers) │
                                     └─────────────┘

Implementation:

// Sales Service - Synchronous call to Common Service
public class CreateInvoiceCommandHandler : IRequestHandler<CreateInvoiceCommand, Result<Guid>>
{
    private readonly IHttpClientFactory _httpClientFactory;
    private readonly IInvoiceRepository _invoiceRepository;

    public async Task<Result<Guid>> Handle(
        CreateInvoiceCommand request, 
        CancellationToken cancellationToken)
    {
        // 1. Validate customer exists (synchronous HTTP call)
        var customer = await ValidateCustomerAsync(request.CustomerId, cancellationToken);
        if (customer == null)
            return Result<Guid>.Failure("Customer not found");

        // 2. Create invoice
        var invoice = Invoice.Create(request.CustomerId, request.Amount);
        await _invoiceRepository.AddAsync(invoice, cancellationToken);

        // 3. Publish event for async processing
        await PublishInvoiceCreatedEventAsync(invoice);

        return Result<Guid>.Success(invoice.Id);
    }

    private async Task<CustomerDto?> ValidateCustomerAsync(
        Guid customerId, 
        CancellationToken cancellationToken)
    {
        var client = _httpClientFactory.CreateClient("CommonService");
        var response = await client.GetAsync(
            $"api/customers/{customerId}", 
            cancellationToken
        );

        if (!response.IsSuccessStatusCode)
            return null;

        return await response.Content.ReadFromJsonAsync<CustomerDto>(cancellationToken);
    }
}

Resilience Pattern:

// HTTP Client with Polly resilience
services.AddHttpClient("CommonService", client =>
{
    client.BaseAddress = new Uri(configuration["Services:Common:Url"]);
    client.Timeout = TimeSpan.FromSeconds(10);
})
.AddPolicyHandler(Policy<HttpResponseMessage>
    .Handle<HttpRequestException>()
    .WaitAndRetryAsync(3, retryAttempt => TimeSpan.FromSeconds(Math.Pow(2, retryAttempt)))
)
.AddPolicyHandler(Policy<HttpResponseMessage>
    .Handle<HttpRequestException>()
    .CircuitBreakerAsync(5, TimeSpan.FromSeconds(30))
);

2. Asynchronous Communication (Event-Driven)

Used for operations that can be processed eventually and don't require immediate response.

When to Use: - Cross-service data synchronization - Triggering workflows in other services - Audit logging and analytics - Non-critical operations - Operations that can be retried

Example Flow: Invoice Created → Ledger Update

┌─────────────┐                    ┌─────────────────┐
│    Sales    │                    │    RabbitMQ     │
│   Service   │───── Publish ─────▶│  dhanman.events │
│             │   InvoiceCreated   │   (Exchange)    │
└─────────────┘                    └────────┬────────┘
                                            │ Fan-out
                                   ┌────────┴────────┐
                                   │                 │
                        ┌──────────▼─────┐  ┌───────▼────────┐
                        │     Common     │  │  Notification  │
                        │    Service     │  │    Service     │
                        │  (Consumes)    │  │  (Consumes)    │
                        └────────────────┘  └────────────────┘

Event Publishing:

// Sales Service - Publishing event after invoice creation
public class CreateInvoiceCommandHandler : IRequestHandler<CreateInvoiceCommand, Result<Guid>>
{
    private readonly IEventPublisher _eventPublisher;
    private readonly IInvoiceRepository _invoiceRepository;

    public async Task<Result<Guid>> Handle(
        CreateInvoiceCommand request, 
        CancellationToken cancellationToken)
    {
        // 1. Create and save invoice
        var invoice = Invoice.Create(request.CustomerId, request.Amount, request.DueDate);
        await _invoiceRepository.AddAsync(invoice, cancellationToken);

        // 2. Publish domain event
        var @event = new InvoiceCreatedEvent
        {
            InvoiceId = invoice.Id,
            CustomerId = invoice.CustomerId,
            Amount = invoice.Amount,
            DueDate = invoice.DueDate,
            CreatedAt = DateTime.UtcNow
        };

        await _eventPublisher.PublishAsync(@event, cancellationToken);

        return Result<Guid>.Success(invoice.Id);
    }
}

Event Consumption:

// Common Service - Consuming event to update ledger
public class InvoiceCreatedEventHandler : IMessageHandler<InvoiceCreatedEvent>
{
    private readonly ILedgerService _ledgerService;
    private readonly ILogger<InvoiceCreatedEventHandler> _logger;

    public async Task HandleAsync(
        InvoiceCreatedEvent @event, 
        CancellationToken cancellationToken)
    {
        _logger.LogInformation(
            "Processing InvoiceCreatedEvent for Invoice {InvoiceId}", 
            @event.InvoiceId
        );

        try
        {
            // Create debit entry (Accounts Receivable)
            await _ledgerService.CreateEntryAsync(new LedgerEntry
            {
                AccountCode = "1200", // Accounts Receivable
                Type = EntryType.Debit,
                Amount = @event.Amount,
                ReferenceId = @event.InvoiceId,
                ReferenceType = "Invoice",
                Description = $"Invoice created for customer {@event.CustomerId}"
            });

            // Create credit entry (Revenue)
            await _ledgerService.CreateEntryAsync(new LedgerEntry
            {
                AccountCode = "4000", // Revenue
                Type = EntryType.Credit,
                Amount = @event.Amount,
                ReferenceId = @event.InvoiceId,
                ReferenceType = "Invoice",
                Description = $"Revenue from invoice {@event.InvoiceId}"
            });

            _logger.LogInformation(
                "Ledger entries created for Invoice {InvoiceId}", 
                @event.InvoiceId
            );
        }
        catch (Exception ex)
        {
            _logger.LogError(
                ex, 
                "Error processing InvoiceCreatedEvent for Invoice {InvoiceId}", 
                @event.InvoiceId
            );
            throw; // Retry mechanism will handle
        }
    }
}

3. Command Pattern (Point-to-Point)

Used for directing specific actions to a particular service.

Example Flow: Send Email Command

┌─────────────┐                    ┌──────────────────┐
│    Sales    │                    │     RabbitMQ     │
│   Service   │─── Publish ───────▶│ dhanman.commands │
│             │  SendEmailCommand  │   (Exchange)     │
└─────────────┘                    └────────┬─────────┘
                                            │ Direct routing
                                            │ (notification.commands)
                                    ┌───────▼──────────┐
                                    │   Notification   │
                                    │     Service      │
                                    │   (Consumes)     │
                                    └──────────────────┘

Command Publishing:

public class SendInvoiceEmailCommand
{
    public Guid InvoiceId { get; set; }
    public Guid CustomerId { get; set; }
    public string EmailTemplate { get; set; }
}

// Sales Service
public class InvoiceEmailService
{
    private readonly ICommandPublisher _commandPublisher;

    public async Task SendInvoiceEmailAsync(Guid invoiceId, Guid customerId)
    {
        var command = new SendInvoiceEmailCommand
        {
            InvoiceId = invoiceId,
            CustomerId = customerId,
            EmailTemplate = "invoice-created"
        };

        await _commandPublisher.PublishAsync(
            command,
            routingKey: "notification.commands"
        );
    }
}

Communication Scenarios

Scenario 1: Monthly Invoice Generation

Flow:

1. Hangfire Recurring Job (Sales Service)
   │
   ├─▶ 2. Query all active residents (HTTP to Community Service)
   │
   ├─▶ 3. For each resident:
   │      ├─▶ Create invoice (Local DB)
   │      ├─▶ Publish InvoiceCreatedEvent (RabbitMQ)
   │      └─▶ Schedule payment reminder (Hangfire Delayed Job)
   │
   ├─▶ 4. Common Service receives events:
   │      └─▶ Update ledger entries
   │
   └─▶ 5. Notification Service receives events:
          └─▶ Send invoice emails

Implementation:

public class MonthlyInvoiceGenerationJob
{
    private readonly IHttpClientFactory _httpClientFactory;
    private readonly IInvoiceService _invoiceService;
    private readonly IBackgroundJobClient _hangfireClient;

    [AutomaticRetry(Attempts = 0)] // Don't retry, run next month
    public async Task GenerateMonthlyInvoices(CancellationToken cancellationToken)
    {
        _logger.LogInformation("Starting monthly invoice generation");

        // 1. Get active residents from Community Service (Synchronous)
        var residents = await GetActiveResidentsAsync(cancellationToken);

        // 2. Generate invoices (with events published)
        var results = new List<(Guid ResidentId, Result<Guid> Result)>();

        foreach (var resident in residents)
        {
            var result = await _invoiceService.GenerateMonthlyInvoiceAsync(
                resident.Id, 
                resident.MonthlyCharges,
                cancellationToken
            );

            results.Add((resident.Id, result));

            if (result.IsSuccess)
            {
                // 3. Schedule reminder for 3 days before due date
                var reminderDate = DateTime.UtcNow.AddDays(27);
                _hangfireClient.Schedule<ReminderService>(
                    x => x.SendPaymentReminder(result.Value, cancellationToken),
                    reminderDate
                );
            }
        }

        // 4. Log summary
        _logger.LogInformation(
            "Monthly invoice generation completed. Success: {Success}, Failed: {Failed}",
            results.Count(r => r.Result.IsSuccess),
            results.Count(r => r.Result.IsFailure)
        );
    }

    private async Task<List<ResidentDto>> GetActiveResidentsAsync(
        CancellationToken cancellationToken)
    {
        var client = _httpClientFactory.CreateClient("CommunityService");
        var response = await client.GetAsync("api/residents/active", cancellationToken);
        response.EnsureSuccessStatusCode();
        return await response.Content.ReadFromJsonAsync<List<ResidentDto>>(cancellationToken);
    }
}

Scenario 2: Purchase Order Approval Workflow

Flow:

1. User submits PO (Frontend → Purchase Service)
   │
   ├─▶ 2. Purchase Service:
   │      ├─▶ Create PO (Local DB)
   │      └─▶ Publish POSubmittedEvent (RabbitMQ)
   │
   ├─▶ 3. Notification Service:
   │      └─▶ Send approval request emails to approvers
   │
   ├─▶ 4. Approver approves (Frontend → Purchase Service)
   │      ├─▶ Update PO status (Local DB)
   │      └─▶ Publish POApprovedEvent (RabbitMQ)
   │
   ├─▶ 5. Common Service:
   │      └─▶ Create budget reservation ledger entry
   │
   └─▶ 6. Notification Service:
          └─▶ Send approval confirmation to requester

Implementation:

// Step 1: Submit PO
public class SubmitPurchaseOrderCommandHandler 
    : IRequestHandler<SubmitPurchaseOrderCommand, Result<Guid>>
{
    public async Task<Result<Guid>> Handle(
        SubmitPurchaseOrderCommand request, 
        CancellationToken cancellationToken)
    {
        var po = PurchaseOrder.Create(request.VendorId, request.Items);
        po.Submit();

        await _repository.AddAsync(po, cancellationToken);

        // Publish event
        await _eventPublisher.PublishAsync(new POSubmittedEvent
        {
            POId = po.Id,
            VendorId = po.VendorId,
            Amount = po.TotalAmount,
            RequiresApproval = po.TotalAmount > 50000,
            Approvers = po.TotalAmount > 50000 
                ? new[] { "manager@company.com", "cfo@company.com" }
                : new[] { "manager@company.com" }
        });

        return Result<Guid>.Success(po.Id);
    }
}

// Step 3: Notification Service consumes event
public class POSubmittedEventHandler : IMessageHandler<POSubmittedEvent>
{
    private readonly IEmailService _emailService;

    public async Task HandleAsync(
        POSubmittedEvent @event, 
        CancellationToken cancellationToken)
    {
        if (@event.RequiresApproval)
        {
            foreach (var approver in @event.Approvers)
            {
                await _emailService.SendAsync(new EmailMessage
                {
                    To = approver,
                    Subject = $"Purchase Order {event.POId} Requires Approval",
                    Template = "po-approval-required",
                    Data = new { POId = @event.POId, Amount = @event.Amount }
                });
            }
        }
    }
}

// Step 4: Approve PO
public class ApprovePurchaseOrderCommandHandler 
    : IRequestHandler<ApprovePurchaseOrderCommand, Result>
{
    public async Task<Result> Handle(
        ApprovePurchaseOrderCommand request, 
        CancellationToken cancellationToken)
    {
        var po = await _repository.GetByIdAsync(request.POId, cancellationToken);
        if (po == null)
            return Result.Failure("PO not found");

        var result = po.Approve(request.ApproverId, request.Comments);
        if (result.IsFailure)
            return result;

        await _repository.UpdateAsync(po, cancellationToken);

        // Publish approved event
        await _eventPublisher.PublishAsync(new POApprovedEvent
        {
            POId = po.Id,
            ApprovedBy = request.ApproverId,
            ApprovedAt = DateTime.UtcNow,
            Amount = po.TotalAmount
        });

        return Result.Success();
    }
}

// Step 5: Common Service creates ledger entry
public class POApprovedEventHandler : IMessageHandler<POApprovedEvent>
{
    private readonly ILedgerService _ledgerService;

    public async Task HandleAsync(
        POApprovedEvent @event, 
        CancellationToken cancellationToken)
    {
        // Create budget reservation entry
        await _ledgerService.CreateEntryAsync(new LedgerEntry
        {
            AccountCode = "2100", // Accounts Payable
            Type = EntryType.Credit,
            Amount = @event.Amount,
            ReferenceId = @event.POId,
            ReferenceType = "PurchaseOrder",
            Description = $"Budget reserved for approved PO {@event.POId}"
        });
    }
}

Scenario 3: Payment Processing with Saga

Flow:

1. User makes payment (Frontend → Sales Service)
   │
   ├─▶ 2. Sales Service (Saga Orchestrator):
   │      ├─▶ Validate invoice exists (Local DB)
   │      ├─▶ Process payment via gateway (HTTP to Payment Gateway)
   │      └─▶ Mark invoice as paid (Local DB)
   │
   ├─▶ 3. If successful:
   │      ├─▶ Publish PaymentReceivedEvent (RabbitMQ)
   │      │   │
   │      │   ├─▶ Common Service: Update ledger
   │      │   └─▶ Notification Service: Send receipt
   │      │
   │      └─▶ Update customer balance (HTTP to Common Service)
   │
   └─▶ 4. If failed:
          └─▶ Rollback: Mark payment as failed, don't publish events

Implementation:

public class ProcessPaymentSaga
{
    private readonly IInvoiceRepository _invoiceRepository;
    private readonly IPaymentGateway _paymentGateway;
    private readonly IEventPublisher _eventPublisher;
    private readonly IHttpClientFactory _httpClientFactory;

    public async Task<Result<Guid>> ExecuteAsync(
        ProcessPaymentCommand command,
        CancellationToken cancellationToken)
    {
        // Step 1: Get invoice
        var invoice = await _invoiceRepository.GetByIdAsync(
            command.InvoiceId, 
            cancellationToken
        );

        if (invoice == null)
            return Result<Guid>.Failure("Invoice not found");

        if (invoice.IsPaid)
            return Result<Guid>.Failure("Invoice already paid");

        // Step 2: Process payment via gateway
        PaymentResult paymentResult;
        try
        {
            paymentResult = await _paymentGateway.ProcessPaymentAsync(new PaymentRequest
            {
                Amount = invoice.Amount,
                Currency = "USD",
                PaymentMethodId = command.PaymentMethodId,
                IdempotencyKey = command.IdempotencyKey
            });
        }
        catch (Exception ex)
        {
            _logger.LogError(ex, "Payment gateway error for invoice {InvoiceId}", invoice.Id);
            return Result<Guid>.Failure("Payment processing failed");
        }

        if (!paymentResult.IsSuccess)
        {
            return Result<Guid>.Failure($"Payment failed: {paymentResult.ErrorMessage}");
        }

        // Step 3: Mark invoice as paid
        var payment = Payment.Create(
            invoice.Id,
            invoice.Amount,
            paymentResult.TransactionId,
            PaymentMethod.CreditCard
        );

        invoice.MarkAsPaid(payment);
        await _invoiceRepository.UpdateAsync(invoice, cancellationToken);

        // Step 4: Update customer balance (HTTP call)
        await UpdateCustomerBalanceAsync(
            invoice.CustomerId, 
            -invoice.Amount, 
            cancellationToken
        );

        // Step 5: Publish success event
        await _eventPublisher.PublishAsync(new PaymentReceivedEvent
        {
            PaymentId = payment.Id,
            InvoiceId = invoice.Id,
            CustomerId = invoice.CustomerId,
            Amount = invoice.Amount,
            TransactionId = paymentResult.TransactionId,
            ProcessedAt = DateTime.UtcNow
        });

        return Result<Guid>.Success(payment.Id);
    }

    private async Task UpdateCustomerBalanceAsync(
        Guid customerId,
        decimal amountDelta,
        CancellationToken cancellationToken)
    {
        var client = _httpClientFactory.CreateClient("CommonService");
        var response = await client.PostAsJsonAsync(
            $"api/customers/{customerId}/update-balance",
            new { AmountDelta = amountDelta },
            cancellationToken
        );

        if (!response.IsSuccessStatusCode)
        {
            _logger.LogWarning(
                "Failed to update customer balance for {CustomerId}", 
                customerId
            );
            // Don't fail the saga, balance will sync via events eventually
        }
    }
}

Integration Patterns Summary

Pattern Selection Guide

Pattern Use Case Consistency Performance Complexity
Synchronous HTTP Real-time validation Strong Medium Low
Async Events Cross-service workflows Eventual High Medium
Commands Directed actions Eventual High Medium
Saga Distributed transactions Eventual Medium High

Event Types

Event Publisher Consumers Purpose
InvoiceCreatedEvent Sales Common, Notification Ledger update, email notification
PaymentReceivedEvent Sales Common, Notification Ledger update, receipt email
BillCreatedEvent Purchase Common Ledger update
POApprovedEvent Purchase Common, Notification Budget reservation, notifications
SalaryPostedEvent Payroll Common, Notification Ledger update, payslip generation
UserCreatedEvent Common All services User sync across services

Best Practices

Do's ✅

  • Use async events for cross-service workflows
  • Implement idempotency for all event handlers
  • Use correlation IDs for tracing
  • Implement proper error handling and DLQ
  • Version your events for backward compatibility
  • Use HTTP for real-time validations
  • Implement circuit breakers for external calls
  • Monitor message queue health
  • Log all cross-service communication

Don'ts ❌

  • Don't make synchronous calls in event handlers
  • Don't create circular dependencies
  • Don't skip retry mechanisms
  • Don't ignore dead-letter queues
  • Don't send large payloads in events (use IDs)
  • Don't create tight coupling through synchronous calls
  • Don't forget timeout configurations
  • Don't skip correlation IDs

Monitoring Communication

Metrics to Track

  • Message processing time
  • Event publish rate
  • HTTP call duration
  • Circuit breaker state
  • Dead-letter queue size
  • Retry attempts
  • Failed messages

Grafana Dashboards

- Inter-service call latency
- Event processing throughput
- Failed message count
- Circuit breaker trips
- Queue depth


Summary

Dhanman's microservices communicate through: - Synchronous HTTP for real-time operations - Asynchronous events for cross-service workflows - Commands for directed actions - Sagas for distributed transactions

This combination provides flexibility, scalability, and resilience while maintaining data consistency across services.