Reliably handling externally processed records in an highly concurrent environment.
A three step approach to handle external processing of records.
To reliably process records in a concurrent environment which interface with a external system (not included in the database transaction) we must adopt a "three step approach".
The three steps are:-
-
Take ownership of the record by marking as "processing" and prevent other processes from doing the same by using an optimistic locking strategy.
-
Process the record itself in the external system (payment gateway, send email etc)
-
Record the result of the processing in the database by marking the record status as complete or error.
If a power outage or other extreme event occurs the only records in question are the ones marked as 'processing'.
Sample processing of a record in a concurrent environment |
/** * Handling race conditions when processing record. * * Firstly this process need to take ownership of the record using * optimistic locking strategy to prevent other processes grabbing the * same record. * http://en.wikipedia.org/wiki/Optimistic_concurrency_control * * Process the record ( email in this case) * * Mark the record as completed. */ MutableDataSource mds = getConnection().getMutableDataSource(); // find the email to be sent
email = (DBEmailSend) mds.findKey(gk); mds.markSavePoint("SEND_START");// Mark the start point. for (int i = 0; true; i++) { try { /* * record the current transaction so that if a concurrent process trys to change the email then a dirty cache will be thrown. */ email.forceLockedTransaction(); /** * check and change the status of the email. Change the status * to a intermediate status and save. * * The save process on this record is atomic and only one * process will be able to successfully change the status to * "PROCESSING". * */ String sendStatus = email.getString(DBEmailSend.DBFIELD_SEND_STATUS); if (sendStatus.equals(DBEmailSendStatus.LIST_ENTERED) == false && sendStatus.equals(DBEmailSendStatus.LIST_QUEUED) == false) { throw new Exception("Send status must be ENTERED or QUEUED"); } email.setValue(DBEmailSend.DBFIELD_SEND_STATUS, DBEmailSendStatus.LIST_PROCESSING); mds.save("Email send started processing:" + email); }
catch (DirtyCacheException dce) { /** * A concurrent change has been detected. Rollback to the start * point and retry. If the other process has successfully taken * ownership of the record we will skip. */ mds.rollbackTo("SEND_START"); if (i == 9) { String errorMsg = "Cannot send email: " + s + " as repeatedly failed in dirty cache "; LOGGER.error(errorMsg); throw new Exception(errorMsg); } else { long msecs = (long) (1000.0 * Math.random()); Thread.sleep(msecs);
continue; } }
/** * The only way that we would successfully get to this step is that * we have taken ownership of the record by setting the status to * "PROCESSING". * * The actual send process will set the status to "OK" or "FAILED". * * If the machine is turned off or crashes at any point the only * email records that we need to check the actual status of is the * ones that are with the status "PROCESSING" */ email.send(mds);
break;
}