From cc14ea26aea3f9e41409c04f4e0dcaba0c5b5186 Mon Sep 17 00:00:00 2001 From: CleverChuk Date: Thu, 8 Jul 2021 22:50:08 -0400 Subject: [PATCH 1/2] This change is to avoid propagating kafka error to the application rather always use the fallback on any kafka failure. There's no point in having a fallback if the main appender kills the application on failure --- .../delivery/AsynchronousDeliveryStrategy.java | 11 ++++------- .../delivery/AsynchronousDeliveryStrategyTest.java | 13 +++++++++++++ 2 files changed, 17 insertions(+), 7 deletions(-) diff --git a/src/main/java/com/github/danielwegener/logback/kafka/delivery/AsynchronousDeliveryStrategy.java b/src/main/java/com/github/danielwegener/logback/kafka/delivery/AsynchronousDeliveryStrategy.java index 5739dd5..daf1ee6 100644 --- a/src/main/java/com/github/danielwegener/logback/kafka/delivery/AsynchronousDeliveryStrategy.java +++ b/src/main/java/com/github/danielwegener/logback/kafka/delivery/AsynchronousDeliveryStrategy.java @@ -16,16 +16,13 @@ public class AsynchronousDeliveryStrategy implements DeliveryStrategy { public boolean send(Producer producer, ProducerRecord record, final E event, final FailedDeliveryCallback failedDeliveryCallback) { try { - producer.send(record, new Callback() { - @Override - public void onCompletion(RecordMetadata metadata, Exception exception) { - if (exception != null) { - failedDeliveryCallback.onFailedDelivery(event, exception); - } + producer.send(record, (metadata, exception) -> { + if (exception != null) { + failedDeliveryCallback.onFailedDelivery(event, exception); } }); return true; - } catch (BufferExhaustedException | TimeoutException e) { + } catch (Exception e) { failedDeliveryCallback.onFailedDelivery(event, e); return false; } diff --git a/src/test/java/com/github/danielwegener/logback/kafka/delivery/AsynchronousDeliveryStrategyTest.java b/src/test/java/com/github/danielwegener/logback/kafka/delivery/AsynchronousDeliveryStrategyTest.java index 5becb31..f23be9a 100644 --- a/src/test/java/com/github/danielwegener/logback/kafka/delivery/AsynchronousDeliveryStrategyTest.java +++ b/src/test/java/com/github/danielwegener/logback/kafka/delivery/AsynchronousDeliveryStrategyTest.java @@ -4,6 +4,7 @@ import org.apache.kafka.clients.producer.Producer; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.clients.producer.RecordMetadata; +import org.apache.kafka.common.KafkaException; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.errors.TimeoutException; import org.junit.Test; @@ -74,4 +75,16 @@ public void testCallbackWillTriggerOnFailedDeliveryOnProducerSendTimeout() { verify(failedDeliveryCallback).onFailedDelivery(eq("msg"), same(exception)); } + @Test + public void testCallbackWillTriggerOnFailedDeliveryOnAnyError() { + final Exception exception = new KafkaException("miau"); + final ProducerRecord record = new ProducerRecord("topic", 0, null, "msg"); + + when(producer.send(same(record), any(Callback.class))).thenThrow(exception); + + unit.send(producer, record, "msg", failedDeliveryCallback); + + verify(failedDeliveryCallback).onFailedDelivery(eq("msg"), same(exception)); + } + } From 25ea77102b9587d6306151a5844287a1fcd97dd8 Mon Sep 17 00:00:00 2001 From: CleverChuk Date: Fri, 3 Sep 2021 09:03:12 -0400 Subject: [PATCH 2/2] Update src/main/java/com/github/danielwegener/logback/kafka/delivery/AsynchronousDeliveryStrategy.java Co-authored-by: Daniel Wegener --- .../logback/kafka/delivery/AsynchronousDeliveryStrategy.java | 3 +++ 1 file changed, 3 insertions(+) diff --git a/src/main/java/com/github/danielwegener/logback/kafka/delivery/AsynchronousDeliveryStrategy.java b/src/main/java/com/github/danielwegener/logback/kafka/delivery/AsynchronousDeliveryStrategy.java index daf1ee6..738118c 100644 --- a/src/main/java/com/github/danielwegener/logback/kafka/delivery/AsynchronousDeliveryStrategy.java +++ b/src/main/java/com/github/danielwegener/logback/kafka/delivery/AsynchronousDeliveryStrategy.java @@ -23,6 +23,9 @@ public boolean send(Producer producer, ProducerRecord reco }); return true; } catch (Exception e) { + if (e instanceof org.apache.kafka.common.errors.InterruptException) { + Thread.currentThread().interrupt(); + } failedDeliveryCallback.onFailedDelivery(event, e); return false; }