April 07, 2017 ( last updated : April 06, 2017 )
jboss-fuse
data-grid
hot-rod
protobuf
lucene
https://github.com/alainpham/techlab-fuse-jdg-simple-hotrod
Abstract
Building Integration and Services platform with JBoss Fuse is great.
It is even better when you add a distributed in memory data base solution such as JBoss Data Grid to the mix.
This article will show how to make both technologies work together using the camel-jbossdatagrid component.
We will go through the setup of a JBoss Data Grid server with persistence and see how to use it in a JBoss Fuse application through the remote Hot Rod client.
Furthermore we will see how to take advantage of Protocol buffers and Lucene to index data and perform content based queries.
In the example, we will buil rest services to store and retrieve some business events.
In summary the steps in this tutorial are the following :
Create an empty Fuse project in JBoss Developer Studio with Spring DSL
Add necessary dependencies to the pom.xml file
On the Data Grid server, configure a cache called "event" with an index (Lucene) and a cache store (H2 database as an example)
Launch an instance of H2 database and Data Grid Server
Create an POJO as a model of an event
Annotate the POJO with protobuf annotations to specify which fields should be indexed for queries
Create basic generic camel routes for getting and putting events into the cache
Create a query service with dynamic parameters
Test the services
Deploy the service on a Fuse Server
Prerequisites
Versions used
Access modes to JBoss Data Grid
JBoss Data Grid can be accessed through 2 different modes : Library Mode and Server Mode.
In Server Mode, Data Grid is installed as a distant server and Fuse gets access to the objects through the Hot Rod client.
In library mode (or embedded), the Fuse engine uses it's own JVM memory to contain objects that is part of the Data Grid.
In other words the Fuse engine acts as if it was a node amongst the Data Grid cluster.
This mode gives access to advanced features such as transactions and locking and is actually quite easy to setup.
The down side is that it is not possible to perform queries with the infinispan-embedded-query library with Fuse in a Karaf container.
The main reason is that infinispan-embedded-query includes full dependencies to libraries such as hibernate. Hence it is not suited for an OSGI container such as Karaf.
In this article we will explore the possibilities of the Server Mode with the Hot Rod client. The Library Mode will be discussed in an other article.
Create a Fuse project
Start by creating a Fuse project with a Spring Camel Context in JBoss Developer Studio
(Alternatively it can also be created with a maven archetype via command line)
In JBoss Developer Studio go to files -> New -> Fuse Integration Project
Name the project techlab-fuse-jdg-simple-hotrod
Select runtime version 2.17.0.redhat-630224
Start an empty project with Spring DSL
Click finish
Edit information and dependencies in pom.xml file
Change group id, artifact id to identify the project
Add dependency to camel-jbossdatagrid component : this enables invocation of Data Grid endpoints in a Camel route
Note that the version should match the version of the Data Grid Server.
Add dependency to h2 : as an example of cache store we are using an H2 database
Add dependency to camel-netty4-http, camel-swagger-java, camel-jackson : will be used to expose rest services to access the objects stored in the cache
<dependency>
<groupId> org.apache.camel</groupId>
<artifactId> camel-jbossdatagrid</artifactId>
<version> 6.5.1.Final-redhat-1</version>
</dependency>
<dependency>
<groupId> org.apache.camel</groupId>
<artifactId> camel-netty4-http</artifactId>
</dependency>
<dependency>
<groupId> org.apache.camel</groupId>
<artifactId> camel-swagger-java</artifactId>
</dependency>
<dependency>
<groupId> org.apache.camel</groupId>
<artifactId> camel-jackson</artifactId>
</dependency>
Cache Container configuration on Data Grid Server
In the Data Grid installation folder, open the file standalone/configuration/standalone.xml
In the xml file, declare a cache named "event"
Configure a persistence cache store so that events can survive a server reboot. As an example here we will use an H2 Database.
Activate indexing to enable advanced queries using the Lucene engine
Configure eviction so that objects get unloaded from memory when not used. This is to avoid running out of memory.
Evicted entries stay in the cache store and can be reloaded when needed
<server xmlns= "urn:jboss:domain:1.6" >
...
<subsystem xmlns= "urn:jboss:domain:datasources:1.2" >
<datasources>
<datasource jndi-name= "java:jboss/datasources/JdbcDS" pool-name= "JdbcDS" enabled= "true" use-java-context= "true" >
<connection-url> jdbc:h2:tcp://localhost:8942/dgdb</connection-url>
<driver> h2</driver>
<security>
<user-name> sa</user-name>
<password />
</security>
</datasource>
<drivers>
<driver name= "h2" module= "com.h2database.h2" >
<xa-datasource-class> org.h2.jdbcx.JdbcDataSource</xa-datasource-class>
</driver>
</drivers>
</datasources>
</subsystem>
<subsystem xmlns= "urn:infinispan:server:core:6.3" default-cache-container= "local" >
<cache-container name= "local" default-cache= "default" statistics= "true" >
...
<local-cache name= "event" >
<eviction strategy= "LRU" max-entries= "50" /> <!-- At most we have 50 entries in the cache by evicting [l]east-[r]ecently-[u]sed-->
<indexing index= "LOCAL" > <!-- Indexing is activated to be stored localy only -->
<property name= "default.directory_provider" > filesystem</property>
<property name= "default.indexBase" > ispn_index</property>
</indexing>
<!-- A mixed store is created to be able to contain entries with string keys and also binary keys -->
<mixed-keyed-jdbc-store datasource= "java:jboss/datasources/JdbcDS" passivation= "false" preload= "true" purge= "false" >
<binary-keyed-table prefix= "ISPN_MIX_BKT" create-on-start= "true" drop-on-exit= "false" >
<id-column name= "id" type= "VARCHAR" />
<data-column name= "datum" type= "BINARY" />
<timestamp-column name= "version" type= "BIGINT" />
</binary-keyed-table>
<string-keyed-table prefix= "ISPN_MIX_STR" create-on-start= "true" drop-on-exit= "false" >
<id-column name= "id" type= "VARCHAR" />
<data-column name= "datum" type= "BINARY" />
<timestamp-column name= "version" type= "BIGINT" />
</string-keyed-table>
</mixed-keyed-jdbc-store>
</local-cache>
</cache-container>
<cache-container name= "security" />
</subsystem>
...
</server>
Launch H2 data base
Download H2 data base here :
https://storage.googleapis.com/google-code-archive-downloads/v2/code.google.com/h2database/h2-2012-07-13.zip
Run the database with the following parameters
java -cp h2* .jar org.h2.tools.Server -tcp -tcpAllowOthers -tcpPort 8942 -baseDir ./h2dbstore -web -webAllowOthers -webPort 11112
Now we can start the JBoss Data Grid Server by going to its installation folder and running :
bin/standalone.sh
Create a model class for events and annotate for indexing
Create a class and annotate it so that it is indexed by Data Grid. To enable indexing through Hot Rod, POJOs need to serialized as protocol buffers
package techlab.model ;
import java.io.Serializable ;
import java.util.Date ;
import org.infinispan.protostream.annotations.ProtoDoc ;
import org.infinispan.protostream.annotations.ProtoField ;
@ProtoDoc ( "@Indexed" )
public class Event implements Serializable {
private static final long serialVersionUID = 1L ;
private String uid ;
private Date timestmp ;
private String name ;
private String content ;
@ProtoField ( number = 1 )
public String getUid () {
return uid ;
}
public void setUid ( String uid ) {
this . uid = uid ;
}
@ProtoDoc ( "@IndexedField(index = true, store = false)" )
@ProtoField ( number = 2 )
public Date getTimestmp () {
return timestmp ;
}
public void setTimestmp ( Date timestmp ) {
this . timestmp = timestmp ;
}
@ProtoDoc ( "@IndexedField(index = true, store = false)" )
@ProtoField ( number = 3 )
public String getName () {
return name ;
}
public void setName ( String name ) {
this . name = name ;
}
@ProtoField ( number = 4 )
public String getContent () {
return content ;
}
public void setContent ( String content ) {
this . content = content ;
}
}
Create rest services to get and put entries into the cache
Write a CacheManager factory.
Note that protobuf schemas can be generated from the annotated class and then registered to the Data Grid Server on the reserved in metadata cache
package techlab.factory ;
import java.io.IOException ;
import org.infinispan.client.hotrod.RemoteCache ;
import org.infinispan.client.hotrod.RemoteCacheManager ;
import org.infinispan.client.hotrod.configuration.ConfigurationBuilder ;
import org.infinispan.client.hotrod.marshall.ProtoStreamMarshaller ;
import org.infinispan.protostream.SerializationContext ;
import org.infinispan.protostream.annotations.ProtoSchemaBuilder ;
import org.infinispan.protostream.annotations.ProtoSchemaBuilderException ;
import org.infinispan.query.remote.client.ProtobufMetadataManagerConstants ;
import techlab.model.Event ;
public class RemoteCacheManagerFactory {
ConfigurationBuilder clientBuilder ;
public RemoteCacheManagerFactory ( String hostname , int port ) {
clientBuilder = new ConfigurationBuilder ();
clientBuilder . addServer ()
. host ( hostname )
. port ( port )
. marshaller ( new ProtoStreamMarshaller ());
}
public RemoteCacheManager newRemoteCacheManager () throws ProtoSchemaBuilderException , IOException {
RemoteCacheManager remoteCacheManager = new RemoteCacheManager ( clientBuilder . build ());
SerializationContext ctx = ProtoStreamMarshaller . getSerializationContext ( remoteCacheManager );
ProtoSchemaBuilder protoSchemaBuilder = new ProtoSchemaBuilder ();
//create a protobuf schema file from the annotated class. Protobuf marshallers and unmarshallers are generated automtically
String eventSchema = protoSchemaBuilder
. fileName ( "event.proto" )
. packageName ( "techlab" )
. addClass ( Event . class )
. build ( ctx );
//register the protobuf schema in the remote cache
RemoteCache < String , String > metadataCache = remoteCacheManager . getCache ( ProtobufMetadataManagerConstants . PROTOBUF_METADATA_CACHE_NAME );
metadataCache . put ( "event.proto" , eventSchema );
//check if there is an error with the schemas
String errors = metadataCache . get ( ProtobufMetadataManagerConstants . ERRORS_KEY_SUFFIX );
if ( errors != null ) {
throw new IllegalStateException ( "Some Protobuf schema files contain errors:\n" + errors );
}
return remoteCacheManager ;
}
}
Configure the factory in the spring context
<beans>
...
<!-- ########################################################### -->
<!-- Definition of remote cache Manager -->
<!-- ########################################################### -->
<bean class= "techlab.factory.RemoteCacheManagerFactory" id= "remoteCacheManagerFactory" >
<constructor-arg value= "localhost" />
<constructor-arg value= "11222" />
</bean>
<bean factory-bean= "remoteCacheManagerFactory" factory-method= "newRemoteCacheManager"
id= "cacheManager" />
...
</bean>
Add a reusable endpoint to the Camel context
<camelContext id= "techlab-fuse-jdg-library-mode" xmlns= "http://camel.apache.org/schema/spring" >
<!-- Data Grid endpoint -->
<endpoint id= "datagrid" uri= "infinispan://?cacheContainer=#cacheManager" />
</camelContext>
Use the rest DSL to create routes and expose services to do the basic operations
<camelContext id= "techlab-fuse-jdg-library-mode" xmlns= "http://camel.apache.org/schema/spring" >
<!-- Data Grid endpoint -->
<endpoint id= "datagrid" uri= "infinispan://?cacheContainer=#cacheManager" />
<restConfiguration bindingMode= "json" component= "netty4-http"
enableCORS= "true" port= "7123" apiContextPath= "/api-doc" >
<dataFormatProperty key= "prettyPrint" value= "true" />
</restConfiguration>
<rest id= "svc" path= "" >
<get id= "getOp" uri= "{cacheName}/{uid}" >
<description> Get an entry with an ID from a cache</description>
<to uri= "direct:getOp" />
</get>
<put id= "putOp" uri= "{cacheName}/{uid}" type= "techlab.model.Event" >
<description> Inserts an entry with the given ID and content in a cache</description>
<to uri= "direct:putOp" />
</put>
</rest>
<!-- rest service to get an entry with the key -->
<route id= "getOpRoute" >
<from id= "getOpStarter" uri= "direct:getOp" />
<setHeader headerName= "CamelInfinispanKey" id= "getOpRouteSetKey" >
<simple> ${headers.uid}</simple>
</setHeader>
<setHeader headerName= "CamelInfinispanCacheName" id= "getOpRouteSetCacheName" >
<simple> ${headers.cacheName}</simple>
</setHeader>
<setHeader headerName= "CamelInfinispanOperation" id= "getOpRouteSetOperation" >
<constant> CamelInfinispanOperationGet</constant>
</setHeader>
<to id= "getOpRouteToDataGrid" uri= "ref:datagrid" />
<setBody id= "getOpRouteSetResponse" >
<simple> ${header.CamelInfinispanOperationResult}</simple>
</setBody>
</route>
<!-- rest service to put entries into a cache -->
<route id= "putOpRoute" >
<from id= "putOpStarter" uri= "direct:putOp" />
<setHeader headerName= "CamelInfinispanKey" id= "putOpRouteSetKey" >
<simple> ${headers.uid}</simple>
</setHeader>
<setHeader headerName= "CamelInfinispanCacheName" id= "putOpRouteSetCacheName" >
<simple> ${headers.cacheName}</simple>
</setHeader>
<setHeader headerName= "CamelInfinispanOperation" id= "putOpRouteSetOperation" >
<constant> CamelInfinispanOperationPut</constant>
</setHeader>
<setHeader headerName= "CamelInfinispanValue" id= "putOpRouteSetValue" >
<simple> ${body}</simple>
</setHeader>
<to id= "putOpRouteToDataGrid" uri= "ref:datagrid" />
<setBody id= "putOpRouteSetResponse" >
<simple> Value inserted</simple>
</setBody>
</route>
</camelContext>
Create a query service with dynamic parameters
Define a rest service that allows to pass any http query parameters
(i.e http://localhost:7123/query/event/techlab.model.Event?timestmp=1462208399999&name=ended)
<get id= "queryOp" uri= "query/{cacheName}/{type}" >
<description> Allows to query based on object fields using lucene search engine</description>
<to uri= "direct:queryOp" />
</get>
Create classes that generate Data Grid Queries. Note that these classes are pretty generic as they use reflection and are suitable to any data model.
package techlab.dg ;
import java.beans.BeanInfo ;
import java.beans.IntrospectionException ;
import java.beans.Introspector ;
import java.beans.PropertyDescriptor ;
import java.util.Date ;
import java.util.Map ;
import org.apache.camel.component.infinispan.InfinispanQueryBuilder ;
import org.infinispan.query.dsl.FilterConditionContext ;
import org.infinispan.query.dsl.Query ;
import org.infinispan.query.dsl.QueryBuilder ;
import org.infinispan.query.dsl.QueryFactory ;
public class GenericQuery implements InfinispanQueryBuilder {
private Map < String , Object > params ;
private BeanInfo info ;
private Class type ;
public GenericQuery ( String typeName , Map < String , Object > params ) throws ClassNotFoundException , IntrospectionException {
super ();
//inspect the searched class in order to get the fields that can be queried
type = Class . forName ( typeName );
info = Introspector . getBeanInfo ( type , Object . class );
this . params = params ;
}
@Override
public Query build ( QueryFactory < Query > queryFactory ) {
QueryBuilder < Query > qb = queryFactory . from ( type );
FilterConditionContext ctx = null ;
// for each property of the class we look if a parameter has been set
for ( PropertyDescriptor pd : info . getPropertyDescriptors () ){
Object searchValue = this . params . get ( pd . getName ());
//only search the fields that are actually indexed by checking the presence of Field annotation
//only add search criteria when the parameter has been set in the header and when the property is indexed
if ( searchValue != null ){
//if field is a date convert the type explicitly
if ( pd . getPropertyType (). equals ( Date . class )){
searchValue = new Date ( Long . parseLong (( String ) searchValue ));
}
if ( ctx == null ){ //first condition
ctx = qb . having ( pd . getName ()). eq ( searchValue );
} else { //additional conditions with and operator
ctx . and (). having ( pd . getName ()). eq ( searchValue );
}
}
}
return qb . build ();
}
}
package techlab.dg ;
import java.beans.IntrospectionException ;
import org.apache.camel.Exchange ;
import org.apache.camel.component.infinispan.InfinispanQueryBuilder ;
public class GenerateQuery {
public InfinispanQueryBuilder getBuilder ( Exchange ex ) throws ClassNotFoundException , IntrospectionException {
InfinispanQueryBuilder qb = new GenericQuery ( ex . getIn (). getHeader ( "type" , String . class ), ex . getIn (). getHeaders ());
return qb ;
}
}
Declare the beans, service endpoint and route in the Camel context
<beans>
...
<bean id= "generateQuery" class= "techlab.dg.GenerateQuery" />
...
<camelContext id= "techlab-fuse-jdg-library-mode" xmlns= "http://camel.apache.org/schema/spring" >
...
<rest id= "svc" path= "" >
...
<get id= "queryOp" uri= "query/{cacheName}/{type}" >
<description> Allows to query based on object fields using lucene search engine</description>
<to uri= "direct:queryOp" />
</get>
</rest>
...
<!-- rest service to query caches with any indexed field -->
<route id= "queryOpRoute" >
<from id= "queryOpStarter" uri= "direct:queryOp" />
<log message= "Query headers : ${headers}" ></log>
<setHeader headerName= "CamelInfinispanCacheName" id= "queryOpRouteSetCacheName" >
<simple> ${headers.cacheName}</simple>
</setHeader>
<setHeader headerName= "CamelInfinispanOperation" id= "queryOpRouteSetOperation" >
<constant> CamelInfinispanOperationQuery</constant>
</setHeader>
<setHeader headerName= "CamelInfinispanQueryBuilder" id= "queryOpRouteSetBuilder" >
<method ref= "generateQuery" method= "getBuilder" />
</setHeader>
<to id= "queryOpRouteToDataGrid" uri= "ref:datagrid" />
<setBody id= "queryOpRouteSetResponse" >
<simple> ${header.CamelInfinispanOperationResult}</simple>
</setBody>
</route>
</camelContext>
</beans>
Test the project
Run the Fuse project on your developer machine
mvn clean package camel:run
Insert a few entries by running a curl command
curl -X PUT --header 'Content-Type: application/json' --header 'Accept: application/json' -d '{
"uid": "1",
"timestmp": "2017-04-07T19:30:00.000Z",
"name": "start",
"content": "party started" }' 'http://localhost:7123/event/1'
curl -X PUT --header 'Content-Type: application/json' --header 'Accept: application/json' -d '{
"uid": "2",
"timestmp": "2017-04-07T22:15:00.000Z",
"name": "incident",
"content": "police arrived" }' 'http://localhost:7123/event/2'
curl -X PUT --header 'Content-Type: application/json' --header 'Accept: application/json' -d '{
"uid": "3",
"timestmp": "2017-04-07T23:18:00.000Z",
"name": "incident",
"content": "host arrested" }' 'http://localhost:7123/event/3'
curl -X PUT --header 'Content-Type: application/json' --header 'Accept: application/json' -d '{
"uid": "4",
"timestmp": "2017-04-07T23:20:00.000Z",
"name": "end",
"content": "party ended" }' 'http://localhost:7123/event/4'
List all events through a query without parameters
curl -X GET --header 'Accept: application/json' 'http://localhost:7123/query/event/techlab.model.Event'
[ {
"uid" : "1" ,
"timestmp" : 1491593400000 ,
"name" : "start" ,
"content" : "party started"
}, {
"uid" : "2" ,
"timestmp" : 1491603300000 ,
"name" : "incident" ,
"content" : "police arrived"
}, {
"uid" : "3" ,
"timestmp" : 1491607080000 ,
"name" : "incident" ,
"content" : "host arrested"
}, {
"uid" : "4" ,
"timestmp" : 1491607200000 ,
"name" : "end" ,
"content" : "party ended"
}
List all incidents through a query with a parameter
curl -X GET --header 'Accept: application/json' 'http://localhost:7123/query/event/techlab.model.Event?name=incident'
[ {
"uid" : "2" ,
"timestmp" : 1491603300000 ,
"name" : "incident" ,
"content" : "police arrived"
}, {
"uid" : "3" ,
"timestmp" : 1491607080000 ,
"name" : "incident" ,
"content" : "host arrested"
} ]
List all incidents at a certain hour with 2 parameters
curl -X GET --header 'Accept: application/json' 'http://localhost:7123/query/event/techlab.model.Event?name=incident×tmp=1491607080000'
[ {
"uid" : "3" ,
"timestmp" : 1491607080000 ,
"name" : "incident" ,
"content" : "host arrested"
} ]
Alternatively you can also use swagger-ui to test the services
Deploy Fuse project to Fuse Server (Karaf)
In the pom.xml file of the Fuse project, add dynamic import block to the maven-bundle-plugin.
<plugin>
<groupId> org.apache.felix</groupId>
<artifactId> maven-bundle-plugin</artifactId>
<version> ${version.maven-bundle-plugin}</version>
<extensions> true</extensions>
<configuration>
<instructions>
<Bundle-SymbolicName> techlab-fuse-jdg-simple-hotrod</Bundle-SymbolicName>
<Bundle-Name> techlab-fuse-jdg-simple-hotrod</Bundle-Name>
<DynamicImport-Package> *</DynamicImport-Package>
</instructions>
</configuration>
</plugin>
Generate bundle for deployment
mvn clean package
Connect to Fuse console and run these commands to install required dependencies
features:install camel-swagger-java camel-netty4-http camel-jackson
features:addurl mvn:org.apache.camel/camel-jbossdatagrid/6.5.1.Final-redhat-1/xml/features
features:install camel-jbossdatagrid
Install our Fuse project bundle
osgi:install -s file:<PATH_TO_PROJECT>/techlab-fuse-jdg-simple-hotrod/target/techlab-fuse-jdg-simple-hotrod-1.0.0-SNAPSHOT.jar
That's it, now we have our running Data Grid with persistence, indexes and we are able to access it in a Fuse Project.
Thanks for reading
Originally published April 07, 2017
Latest update April 06, 2017
Related posts :
Please enable JavaScript to view the comments powered by Disqus.