@Service
@RequiredArgsConstructor
public class IdempotentService {
private final JdbcTemplate jdbc;
public record Response(String result) {}
@Transactional
public Response handleRequest(String idempotencyKey, String payload) {
boolean isWinner = false;
try {
// --------------------------
// Attempt atomic barrier insert
// --------------------------
// Postgres:
// INSERT INTO idempotency_table (idempotency_key, status)
// VALUES (?, 'IN_PROGRESS')
// ON CONFLICT DO NOTHING
//
// SQL Server:
// INSERT INTO idempotency_table (idempotency_key, status)
// VALUES (?, 'IN_PROGRESS')
int rows = jdbc.update(
"INSERT INTO idempotency_table (idempotency_key, status) VALUES (?, 'IN_PROGRESS')",
idempotencyKey
);
// Postgres: rows == 1 → winner
// SQL Server: INSERT succeeded → winner
isWinner = rows == 1;
} catch (DuplicateKeyException ex) {
// SQL Server only: duplicate → loser
isWinner = false;
}
if (isWinner) {
// --------------------------
// Winner executes business logic
// --------------------------
String result = doBusinessLogic(payload);
// Insert into outbox (side effect)
// INSERT INTO outbox_table (idempotency_key, payload) VALUES (?, ?)
jdbc.update(
"INSERT INTO outbox_table (idempotency_key, payload) VALUES (?, ?)",
idempotencyKey, result
);
// Mark barrier as completed + store response
// UPDATE idempotency_table SET status='COMPLETED', response=? WHERE idempotency_key=?
jdbc.update(
"UPDATE idempotency_table SET status='COMPLETED', response=? WHERE idempotency_key=?",
result, idempotencyKey
);
return new Response(result);
} else {
// --------------------------
// Loser reads existing row safely
// --------------------------
// SQL Server: SELECT ... WITH (UPDLOCK, ROWLOCK) WHERE idempotency_key=?
// Postgres: SELECT * FROM idempotency_table WHERE idempotency_key=?
IdempotencyRecord record = jdbc.queryForObject(
"SELECT status, response FROM idempotency_table " +
(isPostgres() ? "" : "WITH (UPDLOCK, ROWLOCK) ") +
"WHERE idempotency_key=?",
(rs, rowNum) -> new IdempotencyRecord(rs.getString("status"), rs.getString("response")),
idempotencyKey
);
switch (record.status) {
case "COMPLETED":
return new Response(record.response);
case "IN_PROGRESS":
throw new IllegalStateException("Request already in progress");
case "FAILED":
throw new IllegalStateException("Previous attempt failed, safe to retry");
default:
throw new IllegalStateException("Unknown barrier state: " + record.status);
}
}
}
private boolean isPostgres() {
// Detect DB type from DataSource or JdbcTemplate if needed
return true; // placeholder, implement detection
}
private String doBusinessLogic(String payload) {
return "processed:" + payload;
}
private static class IdempotencyRecord {
final String status;
final String response;
IdempotencyRecord(String status, String response) {
this.status = status;
this.response = response;
}
}
}