Ibsen_idempotency_00

By Claus Ibsen and Jonathan Anstey

This article was excerpted from the book Camel in Action, 2nd Edition.

 

Transactions can be used as a means for coordinating state updates among distributed systems to form an unit of work as a whole.

Related to this concept is idempotency. The term idempotent is used in mathematics to describe a function that can be applied multiple times without changing the result beyond the initial result. In computing, the term idempotent is used to describe an operation that will produce the same results if executed once or multiple times.

Idempotency is documented in the EIP book as the Idempotent Consumer pattern. This pattern deals with the problem of how to reliably exchange messages in a distributed system. The book states that even if a sender sends a message only once, the receiver may receive the message more than once. How can this be?

When a sender sends a message to a receiver , the sender can only know the receiver has received and accepted the message if an acknowledge message is returned from the receiver. However if that acknowledgement is lost due to networking issue , then the sender may need to resend the message that the receiver have already received – a duplicate message . Figure 1 illustrates this principle.


Ibsen_idempotency_01

Figure 1 The receiver receives duplicate message because of problem of returning acknowledgement from the receiver to the sender


Therefore many messaging systems, such as Apache ActiveMQ, have built-in mechanisms to eliminate duplicate messages so users don’t have to worry about duplicates.

In Camel, this mechanism is implemented as the idempotent consumer EIP.


Idempotent Consumer EIP

In Camel, the idempotent consumer EIP is used to ensure routing a message once and only once. To archive this, Camel needs to be able to detect duplicate messages. This involves the following two procedures:

  • Evaluate unique key for each message
  • Repository to store previously seen keys

The heart of the idempotent consumer implementation is the repository, which is defined as an interface org.apache.camek.spi.IdempotentRepository with the following methods:

  boolean add(E key);
 boolean contains(E key);
 boolean remove(E key);
 boolean confirm(E key);

How this works in Camel is illustrated in figure 2 and explained in the following steps:

  1. When an Exchange reaches the idempotent consumer, the unique key is evaluated from the Exchange using the configured Camel Expression. For example, this can be a message header, or an XPath expression, etc.
  2. The key is checked against the idempotent repository whether it’s a duplicate or not. This is done by calling the add method. If the method returns false, then the key was successfully added and no duplicate is detected. If the method returns true, then a key already exists and the message is detected as a duplicate.
  3. If the Exchange is not a duplicate, then it’s routed as usual.
  4. When the Exchange is done being routed, then its Synchronization callback is invoked to determine whether to commit or roll back the Exchange.
  5. To commit the Exchange, the confirm method is invoked on the idempotent repository.
  6. To roll back the Exchange, the remove method is invoked on the idempotent repository, which removes the key from the repository. Because the key is removed, then upon redelivery of the the same message, Camel will have no prior knowledge of having seen this message before, and therefore not regard the message as a duplicate. If you want to prevent this and regard the redelivered message as a duplicate, you can configure the option removeOnFailure to false on the idempotent repository. What would happen instead is that upon rollback the key is not removed but confirmed instead (upon rollback the same action is performed as a commit – step 5)

Ibsen_idempotency_02

Figure 2 How the idempotent consumer EIP operates in Camel


Let’s see this in action.

USING THE IDEMPOTENT CONSUMER EIP

In order to use the idempotent consumer EIP, we need to set up the idempotent repository first. Listing 1 shows an example of how to do this using XML DSL.


Listing 1 – Using Idempotent Consumer EIP in XML DSL

 <bean id="repo"                                                      
  class="org.apache.camel.processor.idempotent.MemoryIdempotentRepository"/>
  
   <camelContext xmlns="http://camel.apache.org/schema/spring">
  
     <route>
       <from uri="seda:inbox"/>
       <log message="Incoming order ${header.orderId}"/>
       <to uri="mock:inbox"/>
       <idempotentConsumer messageIdRepositoryRef="repo">             
         <header>orderId</header>                                     
         <log message="Processing order ${header.orderId}"/>
         <to uri="mock:order"/>       </idempotentConsumer>
     </route>
   </camelContext>

 Using the memory idempotent repository

 Route segment using the idempotent consumer

 Expression to evaluate the unique key


In this example we use the built-in memory-based repository, which is defined as a bean in XML DSL . In the Camel route, we then wrap the routing segment that we want to only be invoked once per unique message . The attribute messageIdRepositoryRef  must be configured to refer to the chosen repository . The example uses the message header orderId as the unique key .

NOTE The unique key is evaluated as a String type. Therefore you can only use information from the Exchange that can be converted to a String type. In other words, you cannot use complex types such as Java objects unless they generate unique keys using their toString method.

The example from listing 2 can be done as following using Java DSL.


Listing 2  – Idempotent Consumer using Java DSL

 public void configure() throws Exception {
  
   IdempotentRepository repo = new MemoryIdempotentRepository()    
  
   from("seda:inbox")
     .log("Incoming order ${header.orderId}")
     .to("mock:inbox")
     .idempotentConsumer(header("orderId"), repo)                  
       .log("Processing order ${header.orderId}")
       .to("mock:order")
     .end();
 }

 Using the in-memory idempotent repository

 Route segment using the idempotent consumer, and expression to evaluate unique key


You can try this example using the following Maven goals:

 mvn test -Dtest=SpringIdempotentTest
 mvn test -Dtest=IdempotentTest

Running this example will send 5 messages to the Camel route of which 2 messages will be duplicated, so only 3 messages will be processed by the idempotent consumer. The example uses the following code to send the 5 messages

 template.sendBodyAndHeader("seda:inbox", "Motor", "orderId", "123");
 template.sendBodyAndHeader("seda:inbox", "Motor", "orderId", "123");
 template.sendBodyAndHeader("seda:inbox", "Tires", "orderId", "789");
 template.sendBodyAndHeader("seda:inbox", "Brake pad", "orderId", "456");
 template.sendBodyAndHeader("seda:inbox", "Tires", "orderId", "789");

Which outputs the following to the console:

 2015-04-12 [ - seda://inbox] INFO  route1 - Incoming order 123
 2015-04-12 [ - seda://inbox] INFO  route1 - Processing order 123
 2015-04-12 [ - seda://inbox] INFO  route1 - Incoming order 123
 2015-04-12 [ - seda://inbox] INFO  route1 - Incoming order 789
 2015-04-12 [ - seda://inbox] INFO  route1 - Processing order 789
 2015-04-12 [ - seda://inbox] INFO  route1 - Incoming order 456
 2015-04-12 [ - seda://inbox] INFO  route1 - Processing order 456
 2015-04-12 [ - seda://inbox] INFO  route1 - Incoming order 789

As you can see the orders are only processed once, indicated by the log "Processing order ...".

The example uses the in-memory idempotent repository, but Camel provides other implementations.

Idempotent Repositories

Camel provides a great number of idempotent repositories, each supporting a different level of service – some are for standalone in-memory only, others support clustered environments. Table 1 lists all the current implementations provided by Apache Camel 2.16 with some tips about pros and cons.

Table 1 IdempotentRepository implementations offered by Apache Camel 2.16

Idempotent Repository

Description

MemoryIdempotentRepository

A very fast in-memory store that stores entries in a Map structure. All data will be lost when the JVM is stopped. No support for clustering. This store is provided in the camel-core module.

FileIdempotentRepository

A file based store that uses a Map as a cache for fast lookup. All write operations is synchronized which means the store can be a potential bottleneck for very high concurrent throughput. Each write operation re-writes the file which means for very large data sets it can be slower to write. This store supports active/passive clustering, where the passive cluster is in standby and takes over processing if the master dies. This store is provided in the camel-core module.

JdbcMessageIdRepository

A store using JDBC to store keys in a SQL database. This implementation uses no caching, so each operation accesses the database. Each idempotent consumer must be associated with an unique name, but can reuse the same database table to store keys. High volume processing can become a performance bottleneck if the database cannot keep up. Clustering is supported in both active/passive and active/active mode. This store is provided in the camel-sql module. To use this store its required to setup the needed SQL table using the SQL script listed on the camel-sql documentation webpage.

JpaMessageIdRepository

Similar to JdbcMessageIdRepository but uses JPA as the SQL layer. This store is provided in the camel-jpa module. To use this store JPA can auto create needed SQL tables.

NamedCassandra
IdempotentRepository

A store using Apache Cassandra as the database. This implementation uses Cassandra CQL queries. Clustering is supported as that is native supported by Cassandra. This store is provided in the camel-cassandraql module. To use this store its required to setup the needed table using the CQL script listed on the camel-cassandra documentation webpage.

HazelcastIdempotentRepository

A store using the Hazelcast in-memory data grid as a shared Map of the keys across the cluster of JVMs. Hazelcast can be configured to be in-memory only or also write the Map structure to persistent store using different levels of QoS. This implementation is suitable for clustering. This store is provided in the camel-hazelcast module.

InfinispanIdempotentRepository

Similar to Hazelcast but using JBoss Infinispan instead as the in-memory data grid. This store is provided in the camel-infinispan module.

HBaseIdempotentRepository

A store using Apache HBase as the database. This store should only be considered in use if users already have an existing Hadoop installation. As it would be overkill to setup Hadoop to only be used by Camel as the idempotent repository. This implementation is suitable for clustering. This store is provided in the camel-hbase module.

RedisIdempotentRepository

A store using Redis key/value store.  This implementation is suitable for clustering. This store is provided in the camel-spring-redis module.

It is also possible to write your own custom implementation, by just implementing the org.apache.camel.spi.IdempotentRepository. If you are looking for examples of how to do that, each of the implementations listed in table 12.2 can be used as inspiration.


Cache eviction

Each implementation of the idempotent repository, listed in table 1, has different a strategy for cache eviction. The in-memory base has a limited size with the number of entries they keep in the cache using a LRU (Least Recently Used) cache. The size of this cache can be configured. When there are more keys to be added to the cache, then the least used is discarded first.

Other implementations such as SQL and JPA require you to manually delete unwanted records from the database table.

In memory data grids such as Hazelcast and Infinispan provide configuration for setting the eviction strategy.

You should study the documentation of the used technology to ensure that you use the proper eviction strategy.


To solve a common use case with Camel that allows multiple Camel applications to process files from a shared directory involves using a clustered idempotent repository.


Clustered idempotent repository

This section covers a common use case involving clustering and the idempotent consumer pattern. In this use case, a shared file system is used for exchanging data, which you want to process as fast as possible by scaling up your system in a cluster of active nodes, where each node can pickup incoming files and process them.

The file component from Apache Camel has built-in support for idempotency. But for historical reasons, this implementation does not support atomic operations that a clustered solution would require; there is a small window of opportunity in which duplicates could still happen. Figure 3 illustrates this principle.


Ibsen_idempotency_03

Figure 3 Two clustered nodes (1)(2) with the same Camel application compete concurrently to pick up new files from a shared file system (3) and they may pick up the same file (4), which causes duplicates.


When a new file is written to the shared file system (3), then each active node (1)(2) in the cluster reacts independently and, depending on timing and non deterministic factors, the same file can potentially be picked up by one or more nodes (4). This is not what we want to happen. We want only one node to process a given file at any time, but we also want each node to process different files at the same time, to archive higher throughput and distribute the work across the nodes in the cluster, so node 1 can process file A while node 2 processes file B, and so forth.

The solution we’re looking for is a clustered idempotent repository that the nodes uses to coordinate and grant locks to exactly only one node. Figure 4 shows how this plays out in Camel.


Ibsen_idempotency_04

Figure 4 Two clustered nodes (1)(2) with the same Camel application compete concurrently to pick up new files from a shared file system (3). Both nodes try to acquire a read lock (4) that will permit exactly one with permission to pick up and process the file (5). The other nodes will fail to acquire the lock and will skip attempting to pick up and process the file.


As before, each node in the cluster (1)(2) reacts when new files are available from the shared file system (3). When a node detects a file to pick up, it now first attempts to grab an exclusive read lock (4) from the clustered idempotent repository. If granted, then the file consumer can pick up and process the file (5). And when it’s done with the file, the read lock is released. While the read lock is granted to a node, then any other node that attempts to acquire a read lock for the same file name would be denied by the clustered idempotent repository. The read lock is per file, so each node can read lock different files concurrently.

Let’s see how we can do this in Camel using the camel-hazelcast module.

 

USING HAZELCAST AS CLUSTERED IDEMPOTENT REPOSITORY

The camel-hazelcast module provides a clustered idempotent repository with the class name HazelcastIdempotentRepository. To use this repository, we first need to configure Hazelcast which we can either use as a client to an existing external Hazelcast cluster, or can embed together with our Camel application.

Hazlecast can be configured using Java code or from a XML file. We use the latter in this example and have copied the sample config that is distributed from Hazelcast and located the file in src/main/resources/hazelcast.xml. Hazelcast allows each node to auto join using multicast. By doing this we don’t have to manually configure hostnames, IP addresses and so on for each node to be part of the cluster. We had to change the hazelcast.xml configuration from using UDP to TCP with localhost to make the example run out of the box on personal computers (not all popular operating systems have UDP enabled out of the box). We have left code comments in the hazelcast.xml file as to how we did that.

Take a look at this source code. To represent two nodes we have two almost identical source files camelinaction.ServerFoo and camelinaction.ServerBar. Listing 4 shows the source code for ServerFoo.


Listing 4 – A standalone Java Camel application with Hazelcast embedded

  import com.hazelcast.core.Hazelcast;
 import com.hazelcast.core.HazelcastInstance;
 import org.apache.camel.main.Main;
 import org.apache.camel.processor.idempotent.hazelcast.HazelcastIdempotentRepository;
  
 public class ServerFoo {
  
   private Main main; 
  
   public static void main(String[] args) throws Exception {
       ServerFoo foo = new ServerFoo();
       foo.boot();                                                
   }
  
   public void boot() throws Exception {
     HazelcastInstance hz = Hazelcast.newHazelcastInstance();      
  
     HazelcastIdempotentRepository repo = new HazelcastIdempotentRepository(hz, "camel");                                                  
  
     main = new Main();
     main.enableHangupSupport();
     main.bind("myRepo", repo);                                   
     main.addRouteBuilder(new FileConsumerRoute("FOO", 100));     
     main.run();
   }
 }

 ServerFoo is a standalone Java application with a main method to boot the application.

 Create and embed a Hazelcast instance in server mode

 Setup Camel Hazelcast Idempotent Repository using the name camel.

 Bind the Idempotent Repistory to the Camel registry with the name myRepo

 Add a Camel route from Java code and run the application


Each node is a standalone Java application with a main method (1). Hazelcast is configured using the xml file src/main/resources/hazelcast.xml. Then we embed and create a Hazelcast instance (2) which, by default, reads its configuration from the root classpath as the name hazelcast.xml. At this point, Hazelcast is being started up, and therefore ready to use the Hazelcast idempotent repository (3). The remainder of the code is using Camel Main to easily configure (4)(5) and embed a Camel application with the route.

The Camel route that uses the idempotent repository is shown in listing 5.


Listing 5  – Camel route using clustered idempotent repository with the file consumer

 public class FileConsumerRoute extends RouteBuilder {
   public void configure() throws Exception {
       from("file:target/inbox" +
               "?delete=true" +
               "&readLock=idempotent" +                               
               "&idempotentRepository=#myRepo")                       
             .log(name + " - Received file: ${file:name}")
             .delay(delay)                                            
             .log(name + " - Done file:     ${file:name}")
             .to("file:target/outbox");
     }
 }

 The read lock must be configured as idempotent

 The idempotent repository to use

 Delays processing the file so we humans has a chance to see what happens


The Camel route is simply a file consumer that picks up files from the target/inbox directory (which is the shared directory in this example). The consumer is configured to use idempotent  as its read lock. The idempotent repository is configured with the value #myRepo (3),. When a file is being processed, it’s logged and delay for a little bit, so the application runs a bit slower and we can better see what happens.

You can try this example by running the following Maven goals for separate shells so they run at the same time (You can also run them from your IDE as it’s a standard Java main application).

 mvn compile exec:java -Pfoo
 mvn compile exec:java -Pbar

When you start the 2nd application then Hazelcast should log the cluster state as shown below:

  Members [2] {
       Member [localhost]:5701
       Member [localhost]:5702 this
 }

Here we can see there are two members in the cluster, and that they are linked together using TCP using ports 5701 and 5702.

If you copy a bunch of files to the target/inbox directory, then each node will compete concurrently to pick up and process the files. When a node can’t acquire an exclusive read lock, a WARN is logged as shown:

WARN  tentRepositoryReadLockStrategy - Cannot acquire read lock. Will skip the file: GenericFile[AsyncCallback.java]
WARN  tentRepositoryReadLockStrategy - Cannot acquire read lock. Will skip the file: GenericFile[Attachments.java]

If you look at the output from both consoles, you should see that the WARN from one node is not a WARN from the other node, and vice-versa.

We can reduce the number of read-lock content by sorting the files by random order. This can be done by configuring the file endpoint with shuffle=true. For example, when the author of this article tried this, processing 126 files went from 59 WARNs to only three when shuffle was turned on.

TIP You can also use the camel-infinispan module instead of Hazelcast as it offers similar clustering caching capabilities.

One last thing that needs to be configured is the eviction strategy.

EVICTION STRATEGY

Hazelcast will, by default, keep each element in its distributed cache forever. As this is often not what you want — you may want to be able to pick up a file in the future having the same file as a previously processed file — or if you’re using unique file names, then keeping old files in the cache is a waste of memory. To remove these old files names from the cache, an eviction strategy must be configured. Hazelcast has many options for this, and in this example we have configured the files to be in the cache for 60 seconds using the time to live option.

<time-to-live-seconds>60</time-to-live-seconds>