SpringBoot – Mongo / Create a Mutex to synchronize multiple frontends

When deploying a API in a scalable environment we usually try tom implement a stateless solution to avoid parallelism conflicts between the different nodes.

This works well for event management synchronously executed. But in some case these event need to have a part of the work managed asynchronously. It was my was my case when I had to process alarm report potentially generating multiple communications with execution time not predictable. To manage this I decided to process theses operation asynchronously. There are multiple architecture for managing this like Queuing (my preferred) of dedicate asynchronous engine. I’ve in fact preferred another way to do it for keeping my infrastructure more simple : adding in all of my nodes a batch function processing the alarms asynchronously.

The alarm events are written into the database and processed by the first of the batch taking it. This allow to preserve a fully redundant and simple architecture. That said I need to ensure only one of the process will execute this process at a time and potentially split the work to do between all the nodes. For making this I had to create a mutex solution.

This post describes the way to create a Mutex solution based on MongoDB with Spring Boot.

Basically Mongo can execute a update in the database in a safe way ensuring no concurrent collision. It’s this way possible to create a Mutex solution based on Mongo update.

To start we are going to create a Mutex entry :

This class will represent a mutex entry, for each of the mutex we create a new entry in the collection name “mutex”. We also maintain information about the mutex requester and the reservation time. This time will help us to manage some potential case of deadlock when, as an exemple, a process stop (crash) the mutex taken.

@Document(collection = "mutex")
public class MongoMutex {

    @Id
    private String id;

    // Key for the mutex entry
    protected String mutexId;

    // Is the mutex taken
    protected boolean taken;

    // Date when it has been taken
    protected long takenAt;

    // Who has taken it
    protected String takenBy;

    // ===============================================================
    // Generated getters & setters
    // ===============================================================
    
    ...
}

The MutexService will give the P() and V() function to manage the Mutex

@Service
public class MongoMutexService {
    private final Logger log = LoggerFactory.getLogger(this.getClass());
    private final boolean verbose = false;

    @Autowired
    protected MongoTemplate mongoTemplate;

    /**
     * Get the mutex. Create it if does not exists. Ensure it has 
     * not been frozen by a dead process.
     * Non blocking.
     * @param mutexId
     * @param identity
     * @return true when reserved, false when not.
     */
    public boolean P(String mutexId,String identity) {
        // Verify if exists
        Query query;

        query = new Query();
        query.addCriteria(Criteria.where("mutexId").is(mutexId));
        List<MongoMutex> my = mongoTemplate.find(query,MongoMutex.class);
        if ( my.size() == 0 ) {
            // create it assuming no collision at this point
            log.info("creating the mutex ("+mutexId+")");
            MongoMutex _m = new MongoMutex();
            _m.setMutexId(mutexId);
            _m.setTaken(false);
            mongoTemplate.save(_m);
        } else if (my.size() > 1) {
            // problem
            log.error("multiple mutex for id ("+mutexId+")");
            for ( MongoMutex _m : my) {
                mongoTemplate.remove(_m);
            }
        }

        // try an update
        query = new Query();
        Criteria c = new Criteria();
        c.andOperator(
                Criteria.where("mutexId").is(mutexId),
                Criteria.where("taken").is(false));
        query.addCriteria(c);
        Update update = new Update();
        update.set("taken", true);
        WriteResult _r = mongoTemplate.updateFirst(query,update,MongoMutex.class);
        if ( _r.getN() > 0 )
        {
            // sucess ...
            // Update the information
            query = new Query();
            query.addCriteria(Criteria.where("mutexId").is(mutexId));
            MongoMutex _m = mongoTemplate.findOne(query,MongoMutex.class);
            _m.setTakenAt(Now.NowUtcMs());
            _m.setTakenBy(identity);
            mongoTemplate.save(_m);
            if ( verbose ) log.info("Mutex ("+mutexId+") reserved by ("+identity+")");
            return true;
        } else {
            log.info("Failed to reserve Mutex ("+mutexId+") by ("+identity+")");
            query = new Query();
            query.addCriteria(Criteria.where("mutexId").is(mutexId));
            MongoMutex _m = mongoTemplate.findOne(query,MongoMutex.class);
            if ( _m != null && Now.NowUtcMs() - _m.getTakenAt() > (10*60*1000) /*this is magic ;) */ ) {
                log.error("Mutex ("+mutexId+") reserved by ("+identity+") was locked !");
                // Mutex blocked for more than 10 minutes looks like frozen...
                _m.setTaken(false);
                mongoTemplate.save(_m);
            }
            return false;
        }

    }

    /**
     * Free a mutex previously reserved
     * @param mutexId
     */
    public void V(String mutexId) {
        Query query = new Query();
        Criteria c = new Criteria();
        c.andOperator(
                Criteria.where("mutexId").is(mutexId),
                Criteria.where("taken").is(true));
        query.addCriteria(c);
        Update update = new Update();
        update.set("taken", false);
        WriteResult _r = mongoTemplate.updateFirst(query,update,MongoMutex.class);
        if ( _r.getN() == 0 ) {
            log.warn("Try to release a Mutex already realeased ...("+mutexId+")");
        } else if ( _r.getN() > 1 ) {
            log.error("Found multiple Mutex ... this is not normal");
        } else {
            if (verbose) log.info("Mutex ("+mutexId+") released");
        }
    }


}

Now we can use the Mutex solution in a process scheduled on all the SpringBoot node running concurrently but not collisionning each other.

@Autowired
protected MongoMutexService mongoMutexService;

// ================================================================
// Alarm Sched
// ================================================================
@Scheduled(fixedRate = 5_000)
public void processAlarmsAsync() {
    // processing alarms
    log.info("Try to processing alarms");
    if ( mongoMutexService.P("alarm","alarm") ) {

        log.info("Processing alarms");

        mongoMutexService.V("alarm");
    }
}

Have fun !

Leave a Reply

Your email address will not be published. Required fields are marked *

This site uses Akismet to reduce spam. Learn how your comment data is processed.