Activemq Publish Subscribe Example Java

In this post, we will learn Asynchronous messaging with the help of the Spring Boot JMS Pub-Sub model. This post will focus on implementing JMS with Spring Boot , which doesn't take long at all to setup.

JMS and message queues, in general, bring some certain advantages over using RESTful services such as:

What is Loose coupling?

The services do not interact directly and only know where the message queue is, where one service sends messages(Publisher) and the other(Subscribers) receives them. Subscribers/Publishers can be added/removed dynamically.

What is Asynchronous messaging?

As the processing time of the message cannot be guaranteed, the client that sent the message can carry on asynchronously to the completion of the transaction. Due to this, the queue should be used to write data (POST if you're thinking in a RESTful mindset).

What is Redundancy?

A message must confirm that it has completed its transaction and that it can now be removed from the queue, but if the transaction fails it can be reprocessed. The messages can also be stored in a database allowing them to continue later on even if the server stops.

Why ActiveMQ-TOPIC?

ActiveMQprovides the Publish-Subscribepattern (pub-sub) for building Jms message distributed systems.

How ActiveMQ-TOPIC works?

When you publish a message, all active subscribers will receive a copy of the message.

spring boot jms topic subscriber example

ActiveMQ Broker

You may download ActiveMQ Broker from: https://activemq.apache.org/activemq-5013000-release

ActiveMQ Broker

Once ActiveMQ is started successfully, open the ActiveMQ Admin URL http://localhost:8161/admin/

You can access Subscribers from its tab

spring boot activemq topic example

Spring Boot JMS Implementation

Spring Boot Producer: SpringActiveMqTopicProducer

In this application, we will publish a message on a topic.

Project Structure

The project structure would be like as below.

Project Structure

Maven File (pom.xml)

          <?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" 	xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> 	<modelVersion>4.0.0</modelVersion>  	<groupId>com.example.activemq</groupId> 	<artifactId>SpringActiveMqTopicProducer</artifactId> 	<version>0.0.1</version> 	<packaging>jar</packaging>  	<name>SpringActiveMqTopicProducer</name> 	<description>SpringActiveMqTopicProducer</description>  	<parent> 		<groupId>org.springframework.boot</groupId> 		<artifactId>spring-boot-starter-parent</artifactId> 		<version>2.4.4</version> 		<relativePath/> <!-- lookup parent from repository --> 	</parent>  	<properties> 		<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> 		<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding> 		<java.version>1.8</java.version> 	</properties>  	<dependencies> 		<dependency> 			<groupId>org.springframework.boot</groupId> 			<artifactId>spring-boot-starter</artifactId> 		</dependency> 		 		<dependency> 			<groupId>org.springframework.boot</groupId> 			<artifactId>spring-boot-starter-activemq</artifactId> 		</dependency> 		 		<dependency> 		         <groupId>com.fasterxml.jackson.core</groupId> 		         <artifactId>jackson-databind</artifactId> 		</dependency> 		<dependency> 			<groupId>org.json</groupId> 			<artifactId>json</artifactId> 			<version>20210307</version> 		</dependency>  		<dependency> 			<groupId>org.springframework.boot</groupId> 			<artifactId>spring-boot-starter-test</artifactId> 			<scope>test</scope> 		</dependency> 		<dependency> 			<groupId>junit</groupId> 			<artifactId>junit</artifactId> 			<scope>test</scope> 		</dependency> 	</dependencies>  	<build> 		<plugins> 			<plugin> 				<groupId>org.springframework.boot</groupId> 				<artifactId>spring-boot-maven-plugin</artifactId> 			</plugin> 		</plugins> 	</build> </project>        

Application Configuration (application.properties)

          jsa.activemq.broker.url=tcp://localhost:61616 jsa.activemq.borker.username=admin jsa.activemq.borker.password=admin jsa.activemq.topic=jsa-topic spring.jms.pub-sub-domain=true        

Application Runner: SpringActiveMqTopicProducerApplication

run method has been overridden in this java class as below for publishing messages to the topic 'jsa-topic'

          package com.the.basic.tech.info.activemq;  import java.util.ArrayList; import java.util.Arrays; import java.util.List;  import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.CommandLineRunner; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication;  import com.the.basic.tech.info.activemq.jms.JmsPublisher; import com.the.basic.tech.info.activemq.models.Company; import com.the.basic.tech.info.activemq.models.Product;  @SpringBootApplication public class SpringActiveMqTopicProducerApplication implements CommandLineRunner {  	@Autowired 	JmsPublisher publisher;  	public static void main(String[] args) { 		SpringApplication.run(SpringActiveMqTopicProducerApplication.class, args); 	}  	@Override 	public void run(String... args) throws Exception { 		/* 		 * Apple company & products 		 */ 		// initial company and products 		Product iphone7 = new Product("Iphone 7"); 		Product iPadPro = new Product("IPadPro");  		List<Product> appleProducts = new ArrayList<Product>(Arrays.asList(iphone7, iPadPro));  		Company apple = new Company("Apple", appleProducts);  		// send message to ActiveMQ            publisher.send(apple);            /* 		 * Samsung company and products 		 */ 		Product galaxyS8 = new Product("Galaxy S8"); 		Product gearS3 = new Product("Gear S3");  		List<Product> samsungProducts = new ArrayList<Product>(Arrays.asList(galaxyS8, gearS3));  		Company samsung = new Company("Samsung", samsungProducts);  		/* 		 * send message to ActiveMQ 		 */            publisher.send(samsung);            } }        

ConnectionFactoryConfig

          package com.the.basic.tech.info.activemq.config;  import javax.jms.ConnectionFactory;  import org.apache.activemq.ActiveMQConnectionFactory; import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.jms.annotation.EnableJms; import org.springframework.jms.core.JmsTemplate; import org.springframework.jms.support.converter.MappingJackson2MessageConverter; import org.springframework.jms.support.converter.MessageConverter; import org.springframework.jms.support.converter.MessageType;  @Configuration @EnableJms public class ConnectionFactoryConfig { 	@Value("${jsa.activemq.broker.url}") 	String brokerUrl; 	 	@Value("${jsa.activemq.borker.username}") 	String userName; 	 	@Value("${jsa.activemq.borker.password}") 	String password;  	/* 	 * Initial ConnectionFactory 	 */     @Bean     public ConnectionFactory connectionFactory(){         ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory();         connectionFactory.setBrokerURL(brokerUrl);         connectionFactory.setUserName(userName);         connectionFactory.setPassword(password);         return connectionFactory;     }      	@Bean // Serialize message content to json using TextMessage 	public MessageConverter jacksonJmsMessageConverter() { 	    MappingJackson2MessageConverter converter = new MappingJackson2MessageConverter(); 	    converter.setTargetType(MessageType.TEXT); 	    converter.setTypeIdPropertyName("_type"); 	    return converter; 	}          /*      * Used for sending Messages.      */ 	@Bean 	public JmsTemplate jmsTemplate(){ 	    JmsTemplate template = new JmsTemplate(); 	    template.setConnectionFactory(connectionFactory()); 	    template.setMessageConverter(jacksonJmsMessageConverter()); 	    template.setPubSubDomain(true); 	    return template; 	} }                  

Model Layer

Company.java

          package com.the.basic.tech.info.activemq.models;  import java.util.List;  import org.json.JSONArray; import org.json.JSONException; import org.json.JSONObject;   import com.fasterxml.jackson.annotation.JsonIdentityInfo; import com.fasterxml.jackson.annotation.ObjectIdGenerators;   @JsonIdentityInfo(generator=ObjectIdGenerators.IntSequenceGenerator.class,property="@id", scope = Company.class) public class Company {     private String name;       private List<Product> products; 	     public Company(){     }          public Company(String name, List<Product> products){     	this.name = name;     	this.products = products;     }          // name     public String getName() {         return name;     }          public void setName(String name) {         this.name = name;     }          // products     public void setProducts(List<Product> products){     	this.products = products;     }          public List<Product> getProducts(){     	return this.products;     }   	/** 	 *  	 * Show Detail View 	 */ 	public String toString(){ 		JSONObject jsonInfo = new JSONObject(); 		 		try { 			jsonInfo.put("name", this.name);   			JSONArray productArray = new JSONArray(); 			if (this.products != null) { 				this.products.forEach(product -> { 					JSONObject subJson = new JSONObject(); 					try { 						subJson.put("name", product.getName()); 					} catch (JSONException e) {} 					 					productArray.put(subJson); 				}); 			} 			jsonInfo.put("products", productArray); 		} catch (JSONException e1) {} 		return jsonInfo.toString(); 	}   }        

Product.java

          package com.the.basic.tech.info.activemq.models;  import com.fasterxml.jackson.annotation.JsonIdentityInfo; import com.fasterxml.jackson.annotation.ObjectIdGenerators;   @JsonIdentityInfo(generator=ObjectIdGenerators.IntSequenceGenerator.class,property="@id", scope = Product.class) public class Product {     private String name;          private Company company; 	     public Product(){     }          public Product(String name){     	this.name = name;     }          public Product(String name, Company company){     	this.name = name;     	this.company = company;     }          // name     public String getName() {         return name;     }          public void setName(String name) {         this.name = name;     }          // products     public void setCompany(Company company){     	this.company = company;     }          public Company getCompany(){     	return this.company;     } }        

JMS Publisher (JmsPublisher.java)

For publishing messages to the ActiveMQ topic

          package com.the.basic.tech.info.activemq.jms;  import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; import org.springframework.jms.core.JmsTemplate; import org.springframework.stereotype.Component;  import com.the.basic.tech.info.activemq.models.Company;  @Component public class JmsPublisher { 	private static final Logger logger = LoggerFactory.getLogger(JmsPublisher.class); 	@Autowired 	JmsTemplate jmsTemplate; 	 	@Value("${jsa.activemq.topic}") 	String topic; 	 	public void send(Company apple){ 		jmsTemplate.convertAndSend(topic, apple); 		logger.info("Message : {} published to topic: {} successfully.", apple.toString(), topic); 		 	} }        

Spring Boot Consumer: SpringActiveMqTopicConsumer

In this application, we will consume a message from the topic.

Project Structure

The project structure would be like as below.

Maven File (pom.xml)

          <?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> 	<modelVersion>4.0.0</modelVersion>  	<groupId>com.example.activemq</groupId> 	<artifactId>SpringActiveMqTopicConsumer</artifactId> 	<version>0.0.1</version> 	<packaging>jar</packaging>  	<name>SpringActiveMqTopicConsumer</name> 	<description>SpringActiveMqTopicConsumer</description>  	<parent> 		<groupId>org.springframework.boot</groupId> 		<artifactId>spring-boot-starter-parent</artifactId> 		<version>2.4.4</version> 		<relativePath /> <!-- lookup parent from repository --> 	</parent>  	<properties> 		<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> 		<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding> 		<java.version>1.8</java.version> 	</properties>  	<dependencies> 		<dependency> 			<groupId>org.springframework.boot</groupId> 			<artifactId>spring-boot-starter</artifactId> 		</dependency>  		<dependency> 			<groupId>org.springframework.boot</groupId> 			<artifactId>spring-boot-starter-activemq</artifactId> 		</dependency>  		<dependency> 			<groupId>com.fasterxml.jackson.core</groupId> 			<artifactId>jackson-databind</artifactId> 		</dependency>  		<dependency> 			<groupId>org.json</groupId> 			<artifactId>json</artifactId> 			<version>20210307</version> 		</dependency>  		<dependency> 			<groupId>org.springframework.boot</groupId> 			<artifactId>spring-boot-starter-test</artifactId> 			<scope>test</scope> 		</dependency> 		<dependency> 			<groupId>junit</groupId> 			<artifactId>junit</artifactId> 			<scope>test</scope> 		</dependency> 	</dependencies>  	<build> 		<plugins> 			<plugin> 				<groupId>org.springframework.boot</groupId> 				<artifactId>spring-boot-maven-plugin</artifactId> 			</plugin> 		</plugins> 	</build> </project>        

Application Configuration (application.properties)

          jsa.activemq.broker.url=tcp://localhost:61616 jsa.activemq.borker.username=admin jsa.activemq.borker.password=admin jsa.activemq.topic=jsa-topic spring.jms.pub-sub-domain=true        

Application Runner: SpringActiveMqTopicConsumerApplication

          package com.the.basic.tech.info.activemq;  import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication;  @SpringBootApplication public class SpringActiveMqTopicConsumerApplication {  	public static void main(String[] args) { 		SpringApplication.run(SpringActiveMqTopicConsumerApplication.class, args); 	} }        

ConnectionFactoryConfig

          package com.the.basic.tech.info.activemq.config;  import javax.jms.ConnectionFactory;  import org.apache.activemq.ActiveMQConnectionFactory; import org.springframework.beans.factory.annotation.Value; import org.springframework.boot.autoconfigure.jms.DefaultJmsListenerContainerFactoryConfigurer; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.jms.config.DefaultJmsListenerContainerFactory; import org.springframework.jms.config.JmsListenerContainerFactory; import org.springframework.jms.support.converter.MappingJackson2MessageConverter; import org.springframework.jms.support.converter.MessageConverter; import org.springframework.jms.support.converter.MessageType;  @Configuration public class ConnectionFactoryConfig { 	@Value("${jsa.activemq.broker.url}") 	String brokerUrl; 	 	@Value("${jsa.activemq.borker.username}") 	String userName; 	 	@Value("${jsa.activemq.borker.password}") 	String password;  	/* 	 * Initial ConnectionFactory 	 */     @Bean     public ConnectionFactory connectionFactory(){         ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory();         connectionFactory.setBrokerURL(brokerUrl);         connectionFactory.setUserName(userName);         connectionFactory.setPassword(password);         return connectionFactory;     }      	@Bean // Serialize message content to json using TextMessage 	public MessageConverter jacksonJmsMessageConverter() { 	    MappingJackson2MessageConverter converter = new MappingJackson2MessageConverter(); 	    converter.setTargetType(MessageType.TEXT); 	    converter.setTypeIdPropertyName("_type"); 	    return converter; 	}      	@Bean 	public JmsListenerContainerFactory<?> jsaFactory(ConnectionFactory connectionFactory, 	                                                DefaultJmsListenerContainerFactoryConfigurer configurer) { 	    DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory(); 	    factory.setPubSubDomain(true); 	    factory.setMessageConverter(jacksonJmsMessageConverter()); 	    configurer.configure(factory, connectionFactory); 	    return factory; 	} }        

Model Layer

Company.java

          package com.the.basic.tech.info.activemq.models;  import java.util.List;  import com.fasterxml.jackson.annotation.JsonIdentityInfo; import com.fasterxml.jackson.annotation.ObjectIdGenerators;  @JsonIdentityInfo(generator = ObjectIdGenerators.IntSequenceGenerator.class, property = "@id", scope = Company.class) public class Company { 	private String name; 	private List<Product> products;  	public Company() { 	}  	public Company(String name, List<Product> products) { 		this.name = name; 		this.products = products; 	}  	// name 	public String getName() { 		return name; 	}  	public void setName(String name) { 		this.name = name; 	}  	// products 	public void setProducts(List<Product> products) { 		this.products = products; 	}  	public List<Product> getProducts() { 		return this.products; 	}  	@Override 	public String toString() { 		return "Company [name=" + name + ", products=" + products + "]"; 	}  }        

Product.java

          package com.the.basic.tech.info.activemq.models;  import com.fasterxml.jackson.annotation.JsonIdentityInfo; import com.fasterxml.jackson.annotation.ObjectIdGenerators;   @JsonIdentityInfo(generator=ObjectIdGenerators.IntSequenceGenerator.class,property="@id", scope = Product.class) public class Product {     private String name;         @Override 	public String toString() { 		return "Product [name=" + name + ", company=" + company + "]"; 	}  	private Company company; 	     public Product(){     }          public Product(String name){     	this.name = name;     }          public Product(String name, Company company){     	this.name = name;     	this.company = company;     }          // name     public String getName() {         return name;     }          public void setName(String name) {         this.name = name;     }          // products     public void setCompany(Company company){     	this.company = company;     }          public Company getCompany(){     	return this.company;     } }        

JMS Subcriber(JmsSubcriber.java)

For consuming messages from ActiveMQ topic

          package com.the.basic.tech.info.activemq.jms;  import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.jms.annotation.JmsListener; import org.springframework.stereotype.Component;  import com.the.basic.tech.info.activemq.models.Company;   @Component public class JmsSubcriber { 	private static final Logger logger = LoggerFactory.getLogger(JmsSubcriber.class); 	 	@JmsListener(destination = "${jsa.activemq.topic}") 	public void receive(Company msg){ 		logger.info("Recieved Message:            {}", msg.toString()); 	} }                  

Spring Boot JMS: Application Logs

JmsPublisher Logs:

          2021-06-08 20:52:36.628  INFO 20636 --- [           main] a.SpringActiveMqTopicProducerApplication : No active profile set, falling back to default profiles: default 2021-06-08 20:52:38.079  INFO 20636 --- [           main] a.SpringActiveMqTopicProducerApplication : Started SpringActiveMqTopicProducerApplication in 2.611 seconds (JVM running for 3.531) 2021-06-08 20:52:38.671  INFO 20636 --- [           main] c.t.b.t.info.activemq.jms.JmsPublisher   : Message : {"name":"Apple","products":[{"name":"Iphone 7"},{"name":"IPadPro"}]} published to topic: jsa-topic successfully. 2021-06-08 20:52:38.703  INFO 20636 --- [           main] c.t.b.t.info.activemq.jms.JmsPublisher   : Message : {"name":"Samsung","products":[{"name":"Galaxy S8"},{"name":"Gear S3"}]} published to topic: jsa-topic successfully.        

Jms Subscriber Logs:

          2021-06-08 20:54:13.700  INFO 2864 --- [           main] a.SpringActiveMqTopicConsumerApplication : No active profile set, falling back to default profiles: default 2021-06-08 20:54:15.137  INFO 2864 --- [           main] a.SpringActiveMqTopicConsumerApplication : Started SpringActiveMqTopicConsumerApplication in 2.208 seconds (JVM running for 2.93) 2021-06-08 20:54:33.900  INFO 2864 --- [ntContainer#0-1] c.t.b.t.info.activemq.jms.JmsSubcriber   :            Recieved Message:            Company [name=Apple, products=[Product [name=Iphone 7, company=null], Product [name=IPadPro, company=null]]] 2021-06-08 20:54:33.916  INFO 2864 --- [ntContainer#0-1] c.t.b.t.info.activemq.jms.JmsSubcriber   :            Recieved Message:            Company [name=Samsung, products=[Product [name=Galaxy S8, company=null], Product [name=Gear S3, company=null]]]        

Download Source Code (Attached)

Have a great day ahead 🙂

elseathedeels.blogspot.com

Source: https://thebasictechinfo.com/interview-preparation/spring-boot-jms-activemq-publish-subscribe-pattern-example/

0 Response to "Activemq Publish Subscribe Example Java"

Postar um comentário

Iklan Atas Artikel

Iklan Tengah Artikel 1

Iklan Tengah Artikel 2

Iklan Bawah Artikel