Consumer
📌 Purpose
The CampaignExecutionConsumer component is responsible for detecting and launching the execution of CampaignExecution entities that are in the NEW status.
It is typically used together with the CampaignExecutionProducer, which creates new executions on a schedule.
🔧 Class: CampaignExecutionConsumer
CampaignExecutionConsumer@Component
public class CampaignExecutionConsumer {
@Autowired
private CampaignExecutionRepository executionRepository;
@Autowired
private CampaignExecutionService executionService;
@Autowired
private TransactionTemplateBuilder ttb;
@SchedulerLock(name = "CampaignExecutionConsumer_consume", lockAtMostFor = "PT5M", lockAtLeastFor = "PT30S")
@Scheduled(initialDelay = 300_000, fixedDelayString = "${campaign.execution.consumer.fixedDelay}")
public void consume() {
executionRepository.findNewExecutionId()
.ifPresent(id -> {
ttb.requiresNew().executeWithoutResult(s -> {
CampaignExecution execution = executionRepository.getSync(id);
if (execution.getStatus() == CampaignExecutionStatus.NEW) {
executionService.runExecution(execution);
}
});
});
}
}🔁 Execution Frequency and Safety
Triggered periodically via
@Scheduled; interval configured inapplication.properties:campaign.execution.consumer.fixedDelay=60000Uses Shedlock via
@SchedulerLockto ensure only one service instance executes in clustered environments.
⚙️ Core Logic
Find a
CampaignExecutionwith statusNEW:executionRepository.findNewExecutionId()Retrieves the latest
NEWexecution (ordered byid desc)
Load the execution using
getSync(id):CampaignExecution execution = executionRepository.getSync(id);Ensures consistent access via
SynchronousAccessRepositoryExecution will only proceed after any ongoing transaction completes
Verify execution status:
if (execution.getStatus() == CampaignExecutionStatus.NEW)Prevents race conditions and duplicate execution
Launch the execution:
executionService.runExecution(execution);Execute in an isolated transaction:
ttb.requiresNew().executeWithoutResult(...)Ensures full isolation from external transaction context, improving consistency and preventing side effects
💥 Error Handling
Errors within runExecution(...) are not swallowed.
Instead, they are logged to an ExceptionEntity within CampaignExecutionService.
As a result, the user can view the error message and stack trace in the UI.
🧪 Example Behavior
Campaign A
executionType = AUTOMATIC
Producer:
Creates
CampaignExecution(id=100)with statusNEW
Consumer:
Finds
id = 100Loads it via
getSync(100)Verifies status is
NEWExecutes the campaign
Updates the status to
FINISHEDorEXCEPTION_OCCURRED
📌 Summary
CampaignExecutionConsumerautomatically executes campaignsDetection: Searches for the first
CampaignExecutioninNEWstatusIsolation: Execution runs in a new transaction using
TransactionTemplateSafety:
@SchedulerLockprevents duplicate executions in clustered setupsConsistency:
getSync()ensures serialized access in concurrent environmentsError Logging: Exceptions are saved to
ExceptionEntityand shown in the UI
Last updated
Was this helpful?