Thursday, 29 August 2019

kafka listner ErrorHandler


you can :

ConcurrentKafkaListenerContainerFactory<Object, Object> factory = new ConcurrentKafkaListenerContainerFactory<>();
  factory.setErrorHandler(new SeekToCurrentErrorHandler());

this is a build in error handler, this will handler will seek (ignore and let the pointer to jump those error message to make sure your llisnter will not receive it again.)



you also can do this your self:

private static final String ERROR_MESSAGE_PARTITIOIN_SPLITOR = " at";

private static final String ERROR_MESSAGE_OFFSET_SPLITOR = "offset ";

private static final String ERROR_MESSAGE_TOPICS_SPLITOR = "-";

private static final String ERROR_MESSAGE_END = ". If needed, please seek past the record to continue consumption.";


private static final String ERROR_MESSAGE_START = "Error deserializing key/value for partition ";


factory.setErrorHandler(new ErrorHandler() {
@Override
public void handle(Exception e, List<ConsumerRecord<?, ?>> records, Consumer<?, ?> consumer,
MessageListenerContainer container) {
if (e != null) {
logger.error("PO listener could not process message.", e);
handleDeserializingError(e, consumer);
}
}

@Override
public void handle(Exception e, ConsumerRecord<?, ?> consumerRecord) {

}

@Override
public void handle(Exception e, ConsumerRecord<?, ?> consumerRecord, Consumer<?, ?> consumer) {
if (e != null) {
logger.error("PO listener could not process message.", e);
handleDeserializingError(e, consumer);
}
}

private void handleDeserializingError(Exception e, Consumer<?, ?> consumer) {
if (e.getMessage().indexOf(ERROR_MESSAGE_START) != -1) {
String s = e.getMessage().split(ERROR_MESSAGE_START)[1].split(ERROR_MESSAGE_END)[0];
String topics = s.split(ERROR_MESSAGE_TOPICS_SPLITOR)[0];
int offset = Integer.valueOf(s.split(ERROR_MESSAGE_OFFSET_SPLITOR)[1]);
int partition = Integer.valueOf(
s.split(ERROR_MESSAGE_TOPICS_SPLITOR)[1].split(ERROR_MESSAGE_PARTITIOIN_SPLITOR)[0]);

TopicPartition topicPartition = new TopicPartition(topics, partition);
logger.info("Skipping {}-{} offset {} ", topics, partition, offset);
consumer.seek(topicPartition, offset + 1);
consumer.commitSync();
}
errorHandlerLatch.countDown();
}

});





note: make sure you do   :  consumer.commitSync();

if your ack mode is manual.

https://dzone.com/articles/spring-for-apache-kafka-deep-dive-part-1-error-han


No comments:

Post a comment