Consumer
📌 Purpose
The CampaignExecutionConsumer
component is responsible for detecting and launching the execution of CampaignExecution
entities that are in the NEW
status.
It is typically used together with the CampaignExecutionProducer
, which creates new executions on a schedule.
🔧 Class: CampaignExecutionConsumer
CampaignExecutionConsumer
@Component
public class CampaignExecutionConsumer {
@Autowired
private CampaignExecutionRepository executionRepository;
@Autowired
private CampaignExecutionService executionService;
@Autowired
private TransactionTemplateBuilder ttb;
@SchedulerLock(name = "CampaignExecutionConsumer_consume", lockAtMostFor = "PT5M", lockAtLeastFor = "PT30S")
@Scheduled(initialDelay = 300_000, fixedDelayString = "${campaign.execution.consumer.fixedDelay}")
public void consume() {
executionRepository.findNewExecutionId()
.ifPresent(id -> {
ttb.requiresNew().executeWithoutResult(s -> {
CampaignExecution execution = executionRepository.getSync(id);
if (execution.getStatus() == CampaignExecutionStatus.NEW) {
executionService.runExecution(execution);
}
});
});
}
}
🔁 Execution Frequency and Safety
Triggered periodically via
@Scheduled
; interval configured inapplication.properties
:campaign.execution.consumer.fixedDelay=60000
Uses Shedlock via
@SchedulerLock
to ensure only one service instance executes in clustered environments.
⚙️ Core Logic
Find a
CampaignExecution
with statusNEW
:executionRepository.findNewExecutionId()
Retrieves the latest
NEW
execution (ordered byid desc
)
Load the execution using
getSync(id)
:CampaignExecution execution = executionRepository.getSync(id);
Ensures consistent access via
SynchronousAccessRepository
Execution will only proceed after any ongoing transaction completes
Verify execution status:
if (execution.getStatus() == CampaignExecutionStatus.NEW)
Prevents race conditions and duplicate execution
Launch the execution:
executionService.runExecution(execution);
Execute in an isolated transaction:
ttb.requiresNew().executeWithoutResult(...)
Ensures full isolation from external transaction context, improving consistency and preventing side effects
💥 Error Handling
Errors within runExecution(...)
are not swallowed.
Instead, they are logged to an ExceptionEntity
within CampaignExecutionService
.
As a result, the user can view the error message and stack trace in the UI.
🧪 Example Behavior
Campaign A
executionType = AUTOMATIC
Producer:
Creates
CampaignExecution(id=100)
with statusNEW
Consumer:
Finds
id = 100
Loads it via
getSync(100)
Verifies status is
NEW
Executes the campaign
Updates the status to
FINISHED
orEXCEPTION_OCCURRED
📌 Summary
CampaignExecutionConsumer
automatically executes campaignsDetection: Searches for the first
CampaignExecution
inNEW
statusIsolation: Execution runs in a new transaction using
TransactionTemplate
Safety:
@SchedulerLock
prevents duplicate executions in clustered setupsConsistency:
getSync()
ensures serialized access in concurrent environmentsError Logging: Exceptions are saved to
ExceptionEntity
and shown in the UI
Last updated
Was this helpful?