Skip to content
Open
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
96 changes: 48 additions & 48 deletions partials/KafkaPublisherImpl.java
Original file line number Diff line number Diff line change
@@ -1,69 +1,69 @@
{% macro kafkaPublisherImpl(asyncapi, params) %}

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.KafkaHeaders;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Service;
{% for channelName, channel in asyncapi.channels() %}
{%- if channel.hasSubscribe() %}
{%- for message in channel.subscribe().messages() %}
import {{params['userJavaPackage']}}.model.{{message.payload().uid() | camelCase | upperFirst}};
{%- endfor -%}
{% endif -%}
{% endfor %}
import org.springframework.kafka.support.SendResult;
import org.springframework.util.concurrent.ListenableFuture;
import org.springframework.util.concurrent.ListenableFutureCallback;

import javax.annotation.processing.Generated;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

@Generated(value="com.asyncapi.generator.template.spring", date="{{''|currentTime }}")
@Generated(value = "com.asyncapi.generator.template.spring", date = "2025-03-06T09:11:34.123Z")
@Service
public class PublisherServiceImpl implements PublisherService {

@Autowired
private KafkaTemplate<Integer, Object> kafkaTemplate;
{% for channelName, channel in asyncapi.channels() %}
{%- if channel.hasSubscribe() %}
{%- set hasParameters = channel.hasParameters() %}
{%- set methodName = channel.subscribe().id() | camelCase %}
{%- if channel.subscribe().hasMultipleMessages() %}
{%- set varName = "object" %}
{%- else %}
{%- set varName = channel.subscribe().message().payload().uid() | camelCase %}
{%- endif %}
{% if channel.description() or channel.subscribe().description() %}/**{% for line in channel.description() | splitByLines %}
* {{line | safe}}{% endfor %}{% for line in channel.subscribe().description() | splitByLines %}
* {{line | safe}}{% endfor %}
*/{% endif %}
public void {{methodName}}(Integer key, {{varName | upperFirst}} {{varName}}{% if hasParameters %}{%for parameterName, parameter in channel.parameters() %}, {% if parameter.schema().type() === 'object'%}{{parameterName | camelCase | upperFirst}}{% else %}{{parameter.schema().type() | toJavaType(false)}}{% endif %} {{parameterName | camelCase}}{% endfor %}{% endif %}) {
Message<{{varName | upperFirst}}> message = MessageBuilder.withPayload({{varName}})
.setHeader(KafkaHeaders.TOPIC, get{{methodName | upperFirst-}}Topic({% if hasParameters %}{%for parameterName, parameter in channel.parameters() %}{{parameterName | camelCase}}{% if not loop.last %}, {% endif %}{% endfor %}{% endif %}))
.setHeader(KafkaHeaders.{%- if params.springBoot2 %}MESSAGE_KEY{% else %}KEY{% endif -%}, key)

// Example method for publishing messages
public void publishMessage(Integer key, YourMessageType messagePayload) {
Message<YourMessageType> message = MessageBuilder.withPayload(messagePayload)
.setHeader(KafkaHeaders.TOPIC, "yourTopicName")
.setHeader(KafkaHeaders.KEY, key)
.build();
kafkaTemplate.send(message);
}

private String get{{methodName | upperFirst-}}Topic({% if hasParameters %}{%for parameterName, parameter in channel.parameters() %}{% if parameter.schema().type() === 'object'%}{{parameterName | camelCase | upperFirst}}{% else %}{{parameter.schema().type() | toJavaType(false)}}{% endif %} {{parameterName | camelCase}}{% if not loop.last %}, {% endif %}{% endfor %}{% endif %}) {
Map<String, String> parameters = {% if hasParameters %}new HashMap<>(){% else %}null{% endif %};
{%- if hasParameters %}
{%- for parameterName, parameter in channel.parameters() %}
parameters.put("{{parameterName}}", {{parameterName | camelCase}}{% if parameter.schema().type() !== 'string'%}.toString(){% endif %});
{%- endfor %}
{%- endif %}
return replaceParameters("{{channelName}}", parameters);
}
{%- endif %}
{%- endfor %}
private String replaceParameters(String topic, Map<String, String> parameters) {
if (parameters != null) {
String compiledTopic = topic;
for (String key : parameters.keySet()) {
compiledTopic = compiledTopic.replace("{" + key + "}", parameters.get(key));
// Asynchronous send with callback
ListenableFuture<SendResult<Integer, Object>> future = kafkaTemplate.send(message);
future.addCallback(new ListenableFutureCallback<SendResult<Integer, Object>>() {
@Override
public void onSuccess(SendResult<Integer, Object> result) {
// Handle success scenario
System.out.println("Message sent successfully: " + result.getProducerRecord().value());
}
return compiledTopic;

@Override
public void onFailure(Throwable ex) {
// Handle failure scenario
System.err.println("Error sending message: " + ex.getMessage());
// Implement retry logic or other error handling mechanisms as needed
}
});

// Alternatively, for synchronous send with exception handling
try {
kafkaTemplate.send(message).get(10, TimeUnit.SECONDS);
// Handle success scenario
System.out.println("Message sent successfully: " + message.getPayload());
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
// Handle thread interruption
System.err.println("Thread interrupted while sending message: " + e.getMessage());
} catch (ExecutionException e) {
// Handle execution exceptions
System.err.println("Execution exception during message send: " + e.getMessage());
// Implement retry logic or other error handling mechanisms as needed
} catch (TimeoutException e) {
// Handle timeout exceptions
System.err.println("Timeout while sending message: " + e.getMessage());
// Implement retry logic or other error handling mechanisms as needed
}
return topic;
}

}
{% endmacro %}