Consumer

📌 Purpose

The CampaignExecutionConsumer component is responsible for detecting and launching the execution of CampaignExecution arrow-up-rightentities that are in the NEW status. It is typically used together with the CampaignExecutionProducerarrow-up-right, which creates new executions on a schedule.


🔧 Class: CampaignExecutionConsumer

@Component
public class CampaignExecutionConsumer {

    @Autowired
    private CampaignExecutionRepository executionRepository;

    @Autowired
    private CampaignExecutionService executionService;

    @Autowired
    private TransactionTemplateBuilder ttb;

    @SchedulerLock(name = "CampaignExecutionConsumer_consume", lockAtMostFor = "PT5M", lockAtLeastFor = "PT30S")
    @Scheduled(initialDelay = 300_000, fixedDelayString = "${campaign.execution.consumer.fixedDelay}")
    public void consume() {
        executionRepository.findNewExecutionId()
            .ifPresent(id -> {
                ttb.requiresNew().executeWithoutResult(s -> {
                    CampaignExecution execution = executionRepository.getSync(id);
                    if (execution.getStatus() == CampaignExecutionStatus.NEW) {
                        executionService.runExecution(execution);
                    }
                });
            });
    }
}

🔁 Execution Frequency and Safety

  • Triggered periodically via @Scheduled; interval configured in application.properties:

  • Uses Shedlock via @SchedulerLock to ensure only one service instance executes in clustered environments.


⚙️ Core Logic

  1. Find a CampaignExecutionarrow-up-right with status NEW:

    • Retrieves the latest NEW execution (ordered by id desc)

  2. Load the execution using getSync(id):

    • Ensures consistent access via SynchronousAccessRepository

    • Execution will only proceed after any ongoing transaction completes

  3. Verify execution status:

    • Prevents race conditions and duplicate execution

  4. Launch the execution:

  5. Execute in an isolated transaction:

    • Ensures full isolation from external transaction context, improving consistency and preventing side effects


💥 Error Handling

Errors within runExecution(...) are not swallowed. Instead, they are logged to an ExceptionEntity within CampaignExecutionServicearrow-up-right. As a result, the user can view the error message and stack trace in the UI.


🧪 Example Behavior

  • Campaign A

    • executionType = AUTOMATIC

  • Producer:

    • Creates CampaignExecution(id=100) with status NEW

  • Consumer:

    • Finds id = 100

    • Loads it via getSync(100)

    • Verifies status is NEW

    • Executes the campaign

    • Updates the status to FINISHED or EXCEPTION_OCCURRED


📌 Summary

  • CampaignExecutionConsumer automatically executes campaigns

  • Detection: Searches for the first CampaignExecutionarrow-up-right in NEW status

  • Isolation: Execution runs in a new transaction using TransactionTemplate

  • Safety: @SchedulerLock prevents duplicate executions in clustered setups

  • Consistency: getSync() ensures serialized access in concurrent environments

  • Error Logging: Exceptions are saved to ExceptionEntity and shown in the UI

Last updated

Was this helpful?