ENTPERRISE INTEGRATION APACHE CAMEL

This post is a follow-up to first post on using Apache Camel. As I mentioned in that talk, my next exercise is to use the SFTP component to automatically fetch the files for me. As I started testing, I realized that I was going to need to handle multiple files, so I started looking at the how I was going to handle that in Camel. Initially, I thought, why not just use the route I previously created:

 // Build the Route
 context.addRoutes(new RouteBuilder {
 // Read the file(s) from the directory
 "file:perf?delete=true&idempotent=true" ==> {
 // Split the file up at each line
 split(_.getIn().getBody(classOf[String]).split("\n")) {
 // The further split up the processing across multiple parsers
 loadbalance roundrobin {
 // Reference to my internal components
 to("direct:x")
 to("direct:y")
 to("direct:z")
 }
 // Join back the results from aboce
 // then split it out again turning off commit level changes
 }.loadbalance roundrobin {
 to("jdbc:dataSource?resetAutoCommit=false")
 	to("jdbc:dataSource?resetAutoCommit=false")
 	to("jdbc:dataSource?resetAutoCommit=false")
 }
 }
 
 // Setup my internal processors with references to my custom component
 "direct:x" process (processor)
 "direct:y" process (processor)
 "direct:z" process (processor)
 })

Much to my surprised, I found that this did not work as expected. So, what happened, well the first thing I noticed is that none of the files [2 of them initially] got deleted. The second thing I noticed is that double the number of records landed into the database than I expected. I was perplexed as to why this was occurring, so I posted a message on the Apache Camel users list and after some back and forth, we realized that I was hitting this bug in Apache Camel. After realizing this, one of the main committers on the project Claus Isben posted a patch for pre-released version 2.10.

I went ahead and downloaded it attempted to run the same route, but I got the same behavior. After some back and forth, we never really came to a final conclusion and I was about to punt on using Apache Camel, then a revelation came to me, why not split the single route into multiple routes that are in effect chained off one another. After some further thought I came up with following set of routes:

val context = new DefaultCamelContext(reg)
 context.addRoutes(new RouteBuilder {
 //
 // Fetch performance metrics for the first server and store them locally
 //
 "sftp://myid@myserver1//prod/msp/logs/prtlf_logs/msp_prtlf_qps_07/msp_prtlf_qps_m00?include=epf_perf.log.*&password=Mypassword&localWorkDirectory=tmp/yashin&idempotent=true" --> "file:perf?fileName=${file:name.noext}_${in.header.CamelFileHost}.txt"
 //
 // Fetch performance metrics for the second server and store them locally
 //
 "sftp://myid@ myserver2//prod/msp/logs/prtlf_logs/msp_prtlf_qps_09/msp_prtlf_qps_m00?include=epf_perf.log.*&password=Mypassword&localWorkDirectory=tmp/recoba&idempotent=true" --> "file:perf?fileName=${file:name.noext}_${in.header.CamelFileHost}.txt"
 //
 // Read the files that land in the perf folder and convert them into a CSV file
 //
 "file:perf?delete=true&idempotent=true&initialDelay=1500&delay=500" ==> {
 process(myProcessor).to("file:perf_outbox")
 }
 //
 // Read the files that land in the perf_outbox folder and convert them into file, that
 // contains a series of insert statements.
 //
 "file:perf_outbox?delete=true&idempotent=true&initialDelay=2000&delay=500" ==> {
 process(insertProcessor).to("file:perf_insert")
 }
 //
 // Read the files that land in the perf_insert folder and execute the insert
 // statements. Because this route is processing a single file at a time, the 
 // split operation works as expected.
 //
 "file:perf_insert?delete=true&idempotent=true&initialDelay=2500&delay=500" ==> {
 split(_.getIn().getBody(classOf[String]).split("\n")).to("jdbc:dataSource")
 }
 })

A couple of things to note, because I'm now fetching the files with the SFTP component, but as the routes indicate, I'm doing this from multiple servers, so I need a way to uniquely name the files. This is easily accomplished in Apache Camel because the SFTP component emits a header property called CamelFileHost that I can use to name the files as they are processed. This is done with the following:

${file:name.noext}_${in.header.CamelFileHost}.txt

This essentially takes the incoming file name, strips off the extension and places the host name on the end of the name.

This processing now allows me to fetch the files remotely and process the performance log files. In my particular case this resulting in the moving of 300MB of log files and processing of approximately 300,000 records in a very efficient fashion.