Activemq Publish Subscribe Example Java
In this post, we will learn Asynchronous messaging with the help of the Spring Boot JMS Pub-Sub model. This post will focus on implementing JMS with Spring Boot , which doesn't take long at all to setup.
JMS and message queues, in general, bring some certain advantages over using RESTful services such as:
What is Loose coupling?
The services do not interact directly and only know where the message queue is, where one service sends messages(Publisher) and the other(Subscribers) receives them. Subscribers/Publishers can be added/removed dynamically.
What is Asynchronous messaging?
As the processing time of the message cannot be guaranteed, the client that sent the message can carry on asynchronously to the completion of the transaction. Due to this, the queue should be used to write data (POST if you're thinking in a RESTful mindset).
What is Redundancy?
A message must confirm that it has completed its transaction and that it can now be removed from the queue, but if the transaction fails it can be reprocessed. The messages can also be stored in a database allowing them to continue later on even if the server stops.
Why ActiveMQ-TOPIC?
ActiveMQprovides the Publish-Subscribepattern (pub-sub) for building Jms message distributed systems.
How ActiveMQ-TOPIC works?
When you publish a message, all active subscribers will receive a copy of the message.
ActiveMQ Broker
You may download ActiveMQ Broker from: https://activemq.apache.org/activemq-5013000-release
Once ActiveMQ is started successfully, open the ActiveMQ Admin URL http://localhost:8161/admin/
You can access Subscribers from its tab
Spring Boot JMS Implementation
Spring Boot Producer: SpringActiveMqTopicProducer
In this application, we will publish a message on a topic.
Project Structure
The project structure would be like as below.
Maven File (pom.xml)
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>com.example.activemq</groupId> <artifactId>SpringActiveMqTopicProducer</artifactId> <version>0.0.1</version> <packaging>jar</packaging> <name>SpringActiveMqTopicProducer</name> <description>SpringActiveMqTopicProducer</description> <parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>2.4.4</version> <relativePath/> <!-- lookup parent from repository --> </parent> <properties> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding> <java.version>1.8</java.version> </properties> <dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-activemq</artifactId> </dependency> <dependency> <groupId>com.fasterxml.jackson.core</groupId> <artifactId>jackson-databind</artifactId> </dependency> <dependency> <groupId>org.json</groupId> <artifactId>json</artifactId> <version>20210307</version> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> </dependency> <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <scope>test</scope> </dependency> </dependencies> <build> <plugins> <plugin> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-maven-plugin</artifactId> </plugin> </plugins> </build> </project>
Application Configuration (application.properties)
jsa.activemq.broker.url=tcp://localhost:61616 jsa.activemq.borker.username=admin jsa.activemq.borker.password=admin jsa.activemq.topic=jsa-topic spring.jms.pub-sub-domain=true
Application Runner: SpringActiveMqTopicProducerApplication
run method has been overridden in this java class as below for publishing messages to the topic 'jsa-topic'
package com.the.basic.tech.info.activemq; import java.util.ArrayList; import java.util.Arrays; import java.util.List; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.CommandLineRunner; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; import com.the.basic.tech.info.activemq.jms.JmsPublisher; import com.the.basic.tech.info.activemq.models.Company; import com.the.basic.tech.info.activemq.models.Product; @SpringBootApplication public class SpringActiveMqTopicProducerApplication implements CommandLineRunner { @Autowired JmsPublisher publisher; public static void main(String[] args) { SpringApplication.run(SpringActiveMqTopicProducerApplication.class, args); } @Override public void run(String... args) throws Exception { /* * Apple company & products */ // initial company and products Product iphone7 = new Product("Iphone 7"); Product iPadPro = new Product("IPadPro"); List<Product> appleProducts = new ArrayList<Product>(Arrays.asList(iphone7, iPadPro)); Company apple = new Company("Apple", appleProducts); // send message to ActiveMQ publisher.send(apple); /* * Samsung company and products */ Product galaxyS8 = new Product("Galaxy S8"); Product gearS3 = new Product("Gear S3"); List<Product> samsungProducts = new ArrayList<Product>(Arrays.asList(galaxyS8, gearS3)); Company samsung = new Company("Samsung", samsungProducts); /* * send message to ActiveMQ */ publisher.send(samsung); } }
ConnectionFactoryConfig
package com.the.basic.tech.info.activemq.config; import javax.jms.ConnectionFactory; import org.apache.activemq.ActiveMQConnectionFactory; import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.jms.annotation.EnableJms; import org.springframework.jms.core.JmsTemplate; import org.springframework.jms.support.converter.MappingJackson2MessageConverter; import org.springframework.jms.support.converter.MessageConverter; import org.springframework.jms.support.converter.MessageType; @Configuration @EnableJms public class ConnectionFactoryConfig { @Value("${jsa.activemq.broker.url}") String brokerUrl; @Value("${jsa.activemq.borker.username}") String userName; @Value("${jsa.activemq.borker.password}") String password; /* * Initial ConnectionFactory */ @Bean public ConnectionFactory connectionFactory(){ ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(); connectionFactory.setBrokerURL(brokerUrl); connectionFactory.setUserName(userName); connectionFactory.setPassword(password); return connectionFactory; } @Bean // Serialize message content to json using TextMessage public MessageConverter jacksonJmsMessageConverter() { MappingJackson2MessageConverter converter = new MappingJackson2MessageConverter(); converter.setTargetType(MessageType.TEXT); converter.setTypeIdPropertyName("_type"); return converter; } /* * Used for sending Messages. */ @Bean public JmsTemplate jmsTemplate(){ JmsTemplate template = new JmsTemplate(); template.setConnectionFactory(connectionFactory()); template.setMessageConverter(jacksonJmsMessageConverter()); template.setPubSubDomain(true); return template; } }
Model Layer
Company.java
package com.the.basic.tech.info.activemq.models; import java.util.List; import org.json.JSONArray; import org.json.JSONException; import org.json.JSONObject; import com.fasterxml.jackson.annotation.JsonIdentityInfo; import com.fasterxml.jackson.annotation.ObjectIdGenerators; @JsonIdentityInfo(generator=ObjectIdGenerators.IntSequenceGenerator.class,property="@id", scope = Company.class) public class Company { private String name; private List<Product> products; public Company(){ } public Company(String name, List<Product> products){ this.name = name; this.products = products; } // name public String getName() { return name; } public void setName(String name) { this.name = name; } // products public void setProducts(List<Product> products){ this.products = products; } public List<Product> getProducts(){ return this.products; } /** * * Show Detail View */ public String toString(){ JSONObject jsonInfo = new JSONObject(); try { jsonInfo.put("name", this.name); JSONArray productArray = new JSONArray(); if (this.products != null) { this.products.forEach(product -> { JSONObject subJson = new JSONObject(); try { subJson.put("name", product.getName()); } catch (JSONException e) {} productArray.put(subJson); }); } jsonInfo.put("products", productArray); } catch (JSONException e1) {} return jsonInfo.toString(); } }
Product.java
package com.the.basic.tech.info.activemq.models; import com.fasterxml.jackson.annotation.JsonIdentityInfo; import com.fasterxml.jackson.annotation.ObjectIdGenerators; @JsonIdentityInfo(generator=ObjectIdGenerators.IntSequenceGenerator.class,property="@id", scope = Product.class) public class Product { private String name; private Company company; public Product(){ } public Product(String name){ this.name = name; } public Product(String name, Company company){ this.name = name; this.company = company; } // name public String getName() { return name; } public void setName(String name) { this.name = name; } // products public void setCompany(Company company){ this.company = company; } public Company getCompany(){ return this.company; } }
JMS Publisher (JmsPublisher.java)
For publishing messages to the ActiveMQ topic
package com.the.basic.tech.info.activemq.jms; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; import org.springframework.jms.core.JmsTemplate; import org.springframework.stereotype.Component; import com.the.basic.tech.info.activemq.models.Company; @Component public class JmsPublisher { private static final Logger logger = LoggerFactory.getLogger(JmsPublisher.class); @Autowired JmsTemplate jmsTemplate; @Value("${jsa.activemq.topic}") String topic; public void send(Company apple){ jmsTemplate.convertAndSend(topic, apple); logger.info("Message : {} published to topic: {} successfully.", apple.toString(), topic); } }
Spring Boot Consumer: SpringActiveMqTopicConsumer
In this application, we will consume a message from the topic.
Project Structure
The project structure would be like as below.
Maven File (pom.xml)
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>com.example.activemq</groupId> <artifactId>SpringActiveMqTopicConsumer</artifactId> <version>0.0.1</version> <packaging>jar</packaging> <name>SpringActiveMqTopicConsumer</name> <description>SpringActiveMqTopicConsumer</description> <parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>2.4.4</version> <relativePath /> <!-- lookup parent from repository --> </parent> <properties> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding> <java.version>1.8</java.version> </properties> <dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-activemq</artifactId> </dependency> <dependency> <groupId>com.fasterxml.jackson.core</groupId> <artifactId>jackson-databind</artifactId> </dependency> <dependency> <groupId>org.json</groupId> <artifactId>json</artifactId> <version>20210307</version> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> </dependency> <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <scope>test</scope> </dependency> </dependencies> <build> <plugins> <plugin> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-maven-plugin</artifactId> </plugin> </plugins> </build> </project>
Application Configuration (application.properties)
jsa.activemq.broker.url=tcp://localhost:61616 jsa.activemq.borker.username=admin jsa.activemq.borker.password=admin jsa.activemq.topic=jsa-topic spring.jms.pub-sub-domain=true
Application Runner: SpringActiveMqTopicConsumerApplication
package com.the.basic.tech.info.activemq; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; @SpringBootApplication public class SpringActiveMqTopicConsumerApplication { public static void main(String[] args) { SpringApplication.run(SpringActiveMqTopicConsumerApplication.class, args); } }
ConnectionFactoryConfig
package com.the.basic.tech.info.activemq.config; import javax.jms.ConnectionFactory; import org.apache.activemq.ActiveMQConnectionFactory; import org.springframework.beans.factory.annotation.Value; import org.springframework.boot.autoconfigure.jms.DefaultJmsListenerContainerFactoryConfigurer; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.jms.config.DefaultJmsListenerContainerFactory; import org.springframework.jms.config.JmsListenerContainerFactory; import org.springframework.jms.support.converter.MappingJackson2MessageConverter; import org.springframework.jms.support.converter.MessageConverter; import org.springframework.jms.support.converter.MessageType; @Configuration public class ConnectionFactoryConfig { @Value("${jsa.activemq.broker.url}") String brokerUrl; @Value("${jsa.activemq.borker.username}") String userName; @Value("${jsa.activemq.borker.password}") String password; /* * Initial ConnectionFactory */ @Bean public ConnectionFactory connectionFactory(){ ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(); connectionFactory.setBrokerURL(brokerUrl); connectionFactory.setUserName(userName); connectionFactory.setPassword(password); return connectionFactory; } @Bean // Serialize message content to json using TextMessage public MessageConverter jacksonJmsMessageConverter() { MappingJackson2MessageConverter converter = new MappingJackson2MessageConverter(); converter.setTargetType(MessageType.TEXT); converter.setTypeIdPropertyName("_type"); return converter; } @Bean public JmsListenerContainerFactory<?> jsaFactory(ConnectionFactory connectionFactory, DefaultJmsListenerContainerFactoryConfigurer configurer) { DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory(); factory.setPubSubDomain(true); factory.setMessageConverter(jacksonJmsMessageConverter()); configurer.configure(factory, connectionFactory); return factory; } }
Model Layer
Company.java
package com.the.basic.tech.info.activemq.models; import java.util.List; import com.fasterxml.jackson.annotation.JsonIdentityInfo; import com.fasterxml.jackson.annotation.ObjectIdGenerators; @JsonIdentityInfo(generator = ObjectIdGenerators.IntSequenceGenerator.class, property = "@id", scope = Company.class) public class Company { private String name; private List<Product> products; public Company() { } public Company(String name, List<Product> products) { this.name = name; this.products = products; } // name public String getName() { return name; } public void setName(String name) { this.name = name; } // products public void setProducts(List<Product> products) { this.products = products; } public List<Product> getProducts() { return this.products; } @Override public String toString() { return "Company [name=" + name + ", products=" + products + "]"; } }
Product.java
package com.the.basic.tech.info.activemq.models; import com.fasterxml.jackson.annotation.JsonIdentityInfo; import com.fasterxml.jackson.annotation.ObjectIdGenerators; @JsonIdentityInfo(generator=ObjectIdGenerators.IntSequenceGenerator.class,property="@id", scope = Product.class) public class Product { private String name; @Override public String toString() { return "Product [name=" + name + ", company=" + company + "]"; } private Company company; public Product(){ } public Product(String name){ this.name = name; } public Product(String name, Company company){ this.name = name; this.company = company; } // name public String getName() { return name; } public void setName(String name) { this.name = name; } // products public void setCompany(Company company){ this.company = company; } public Company getCompany(){ return this.company; } }
JMS Subcriber(JmsSubcriber.java)
For consuming messages from ActiveMQ topic
package com.the.basic.tech.info.activemq.jms; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.jms.annotation.JmsListener; import org.springframework.stereotype.Component; import com.the.basic.tech.info.activemq.models.Company; @Component public class JmsSubcriber { private static final Logger logger = LoggerFactory.getLogger(JmsSubcriber.class); @JmsListener(destination = "${jsa.activemq.topic}") public void receive(Company msg){ logger.info("Recieved Message: {}", msg.toString()); } }
Spring Boot JMS: Application Logs
JmsPublisher Logs:
2021-06-08 20:52:36.628 INFO 20636 --- [ main] a.SpringActiveMqTopicProducerApplication : No active profile set, falling back to default profiles: default 2021-06-08 20:52:38.079 INFO 20636 --- [ main] a.SpringActiveMqTopicProducerApplication : Started SpringActiveMqTopicProducerApplication in 2.611 seconds (JVM running for 3.531) 2021-06-08 20:52:38.671 INFO 20636 --- [ main] c.t.b.t.info.activemq.jms.JmsPublisher : Message : {"name":"Apple","products":[{"name":"Iphone 7"},{"name":"IPadPro"}]} published to topic: jsa-topic successfully. 2021-06-08 20:52:38.703 INFO 20636 --- [ main] c.t.b.t.info.activemq.jms.JmsPublisher : Message : {"name":"Samsung","products":[{"name":"Galaxy S8"},{"name":"Gear S3"}]} published to topic: jsa-topic successfully.
Jms Subscriber Logs:
2021-06-08 20:54:13.700 INFO 2864 --- [ main] a.SpringActiveMqTopicConsumerApplication : No active profile set, falling back to default profiles: default 2021-06-08 20:54:15.137 INFO 2864 --- [ main] a.SpringActiveMqTopicConsumerApplication : Started SpringActiveMqTopicConsumerApplication in 2.208 seconds (JVM running for 2.93) 2021-06-08 20:54:33.900 INFO 2864 --- [ntContainer#0-1] c.t.b.t.info.activemq.jms.JmsSubcriber : Recieved Message: Company [name=Apple, products=[Product [name=Iphone 7, company=null], Product [name=IPadPro, company=null]]] 2021-06-08 20:54:33.916 INFO 2864 --- [ntContainer#0-1] c.t.b.t.info.activemq.jms.JmsSubcriber : Recieved Message: Company [name=Samsung, products=[Product [name=Galaxy S8, company=null], Product [name=Gear S3, company=null]]]
Download Source Code (Attached)
Have a great day ahead 🙂
Source: https://thebasictechinfo.com/interview-preparation/spring-boot-jms-activemq-publish-subscribe-pattern-example/
0 Response to "Activemq Publish Subscribe Example Java"
Postar um comentário