Reading a file is a common use for Apache Camel. From using a file to kick off a larger route to simply needing to store the file content, the ability to only read a file once is important. This is easy when you have a single server with your route deployed, but what about when you deploy your route to multiple servers. Thankfully, Camel has the concept of an Idempotent Consumer.
A useful way to implement this is with an idempotent repository. This repository will keep track of the file being read and not allow another server to read it. It works as a lock on the file. After the file reading has been completing there are options to either remove the row from the database, which would allow for files with the same name to be ingested later or to keep the row there and not allow for files of the same name to be later ingested.
So how do you set up an idempotent repository using JDBC and Spring?
First, you will need to create the table and necessary spring beans. Here I am using the JDBCMessageIdRepository. At the time of development, this bug was still an issue, so I could not use the JPA version. This has since been fixed and you can easily swap one out for the other. This example assumes you already have a bean for your datasource setup. You can use the same fileConsumerRepo bean for multiple routes in your deployment.
CREATE TABLE CAMEL_MESSAGEPROCESSED ( processorName VARCHAR(255), messageId VARCHAR(100), createdAt TIMESTAMP );
@Bean public JdbcMessageIdRepository fileConsumerRepo() { JdbcMessageIdRepository fileConsumerRepo = null; try { fileConsumerRepo = new JdbcMessageIdRepository(dataSource(), "fileConsumerRepo"); } catch (Exception ex) { LOGGER.info("############ Caught exception inside Creating fileConsumerRepo ..." + ex.getMessage()); } if (fileConsumerRepo == null) { LOGGER.info("############ fileConsumerRepo == null ..."); } return fileConsumerRepo; }
In addition, make sure to include org.apache.camel.processor.idempotent.jpa in your packages to scan or persistence unit depending on your exact setup. In my case, I was using a LocalContainerEntityManagerFactoryBean for JPA.
@Bean public LocalContainerEntityManagerFactoryBean entityManagerFactory() throws NamingException { final LocalContainerEntityManagerFactoryBean em = new LocalContainerEntityManagerFactoryBean(); em.setPersistenceUnitName("camel"); final HibernateJpaVendorAdapter vendorAdapter = new HibernateJpaVendorAdapter(); vendorAdapter.setDatabase(Database.ORACLE); vendorAdapter.setShowSql(false); vendorAdapter.setGenerateDdl(false); em.setJpaVendorAdapter(vendorAdapter); em.setPackagesToScan(new String[] { "com.my.package.model","org.apache.camel.processor.idempotent.jpa" }); em.setJpaProperties(additionalProperties()); em.setDataSource(this.dataSource()); return em; }
Now your idempotent repository is ready for a camel to use. Below is an example
from("file:/my/test/dir?moveFailed=.error&autoCreate=true&readLockLoggingLevel=WARN&shuffle=true&readLock=idempotent&idempotentRepository=#fileConsumerRepo&readLockRemoveOnCommit=true") .routeId("ingestionFile") .convertBodyTo(String.class) .log(LoggingLevel.INFO, "File received");
Let's break down the options:
Option | Meaning |
moveFailed | where the file should go if it fails |
autoCreate | if the directory be auto created if it doesn't exist |
readLockLoggingLevel | if a route cannot obtain a lock to read the file what level should that be logged at |
readLock | the type of read locking to use |
idempotentRepository | name of the spring bean defined |
readLockRemoveOnCommit | if the row should be removed upon completion of file reading |
Now deploy your route to multiple servers and watch the magic! You can do this easily using EAP or Karaf by using a port offset. If you are looking for more examples, check out this sample project put together by fellow Red Hatter Josh Reagan.
Whether you are new to Containers or have experience, downloading this cheat sheet can assist you when encountering tasks you haven’t done lately.
Last updated: May 31, 2024