From f239d534721740dd58d2976185da7e1ad494088a Mon Sep 17 00:00:00 2001 From: Mohamed Mohamed Date: Thu, 21 Mar 2024 17:29:50 -0400 Subject: [PATCH] GROUP-101 Updated event consumer to drop errors, added tests to verify --- config/pmd/design.xml | 2 +- .../group/event/GroupEventForwarder.java | 26 ++++--- .../group/sync/GroupUpdateService.java | 19 ++++-- .../domain/outbox/OutboxEvent.java | 4 ++ .../group/domain/OutboxEventTest.java | 14 ++++ .../GroupEventForwarderIntegrationTest.java | 9 ++- .../group/event/GroupEventForwarderTest.java | 9 +-- .../group/sync/GroupUpdateServiceTest.java | 16 +++-- .../grouphq/groupsync/GroupTestUtility.java | 68 ++++++++++++++++++- 9 files changed, 132 insertions(+), 35 deletions(-) diff --git a/config/pmd/design.xml b/config/pmd/design.xml index 8aec966..4fdb1f2 100644 --- a/config/pmd/design.xml +++ b/config/pmd/design.xml @@ -45,7 +45,7 @@ - + diff --git a/src/main/java/org/grouphq/groupsync/group/event/GroupEventForwarder.java b/src/main/java/org/grouphq/groupsync/group/event/GroupEventForwarder.java index 26c6c4a..3279d7e 100644 --- a/src/main/java/org/grouphq/groupsync/group/event/GroupEventForwarder.java +++ b/src/main/java/org/grouphq/groupsync/group/event/GroupEventForwarder.java @@ -26,25 +26,23 @@ public GroupEventForwarder(GroupUpdateService groupUpdateService) { @Bean public Consumer> processedEvents() { return outboxEvents -> - outboxEvents.flatMap(this::forwardUpdate) - .doOnError(throwable -> log.error("Error while forwarding events. " - + "Attempting to resume. Error: {}", throwable.getMessage())) - .onErrorResume(throwable -> Flux.empty()) + outboxEvents.flatMap(outboxEvent -> + forwardUpdate(outboxEvent) + .doOnError(throwable -> log.error("Error while forwarding events. " + + "Attempting to resume. Error: {}", throwable.getMessage())) + .onErrorResume(throwable -> Mono.empty()) + ) .subscribe(); } private Mono forwardUpdate(OutboxEvent outboxEvent) { - switch (outboxEvent.getEventStatus()) { - case SUCCESSFUL -> { - groupUpdateService.sendPublicOutboxEventToAll( - PublicOutboxEvent.convertOutboxEvent(outboxEvent)); - groupUpdateService.sendOutboxEventToEventOwner(OutboxEvent.convertEventDataToPublic(outboxEvent)); - } + return Mono.defer(() -> switch (outboxEvent.getEventStatus()) { + case SUCCESSFUL -> + groupUpdateService.sendPublicOutboxEventToAll(PublicOutboxEvent.convertOutboxEvent(outboxEvent)) + .then(groupUpdateService.sendOutboxEventToEventOwner( + OutboxEvent.convertEventDataToPublic(outboxEvent))); case FAILED -> groupUpdateService .sendOutboxEventToEventOwner(OutboxEvent.convertEventDataToPublic(outboxEvent)); - default -> log.error("Unknown event status: {}", outboxEvent.getEventStatus()); - } - - return Mono.empty(); + }); } } diff --git a/src/main/java/org/grouphq/groupsync/group/sync/GroupUpdateService.java b/src/main/java/org/grouphq/groupsync/group/sync/GroupUpdateService.java index c403d57..59a9559 100644 --- a/src/main/java/org/grouphq/groupsync/group/sync/GroupUpdateService.java +++ b/src/main/java/org/grouphq/groupsync/group/sync/GroupUpdateService.java @@ -7,6 +7,7 @@ import org.springframework.stereotype.Service; import reactor.core.publisher.BufferOverflowStrategy; import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; import reactor.core.publisher.Sinks; /** @@ -36,14 +37,20 @@ public Flux eventOwnerUpdateStream() { .onBackpressureBuffer(100, BufferOverflowStrategy.DROP_OLDEST); } - public void sendPublicOutboxEventToAll(PublicOutboxEvent outboxEvent) { - final Sinks.EmitResult result = publicUpdatesSink.tryEmitNext(outboxEvent); - emitResultLogger("PUBLIC", outboxEvent, result); + public Mono sendPublicOutboxEventToAll(PublicOutboxEvent outboxEvent) { + return Mono.defer(() -> + Mono.just(publicUpdatesSink.tryEmitNext(outboxEvent)) + .doOnNext(result -> emitResultLogger("PUBLIC", outboxEvent, result)) + .then() + ); } - public void sendOutboxEventToEventOwner(OutboxEvent outboxEvent) { - final Sinks.EmitResult result = userUpdatesSink.tryEmitNext(outboxEvent); - emitResultLogger(outboxEvent.getEventStatus().toString(), outboxEvent, result); + public Mono sendOutboxEventToEventOwner(OutboxEvent outboxEvent) { + return Mono.defer(() -> + Mono.just(userUpdatesSink.tryEmitNext(outboxEvent)) + .doOnNext(result -> emitResultLogger("USER", outboxEvent, result)) + .then() + ); } private void emitResultLogger(String eventName, diff --git a/src/main/java/org/grouphq/groupsync/groupservice/domain/outbox/OutboxEvent.java b/src/main/java/org/grouphq/groupsync/groupservice/domain/outbox/OutboxEvent.java index e6037cb..a1efea4 100644 --- a/src/main/java/org/grouphq/groupsync/groupservice/domain/outbox/OutboxEvent.java +++ b/src/main/java/org/grouphq/groupsync/groupservice/domain/outbox/OutboxEvent.java @@ -49,6 +49,10 @@ public static OutboxEvent of(UUID eventId, Long aggregateId, AggregateType aggre } public static OutboxEvent convertEventDataToPublic(OutboxEvent outboxEvent) { + if (outboxEvent.getEventStatus() == EventStatus.FAILED) { + return outboxEvent; + } + return switch (outboxEvent.getEventType()) { case MEMBER_JOINED, MEMBER_LEFT -> convertMember(outboxEvent); default -> outboxEvent; diff --git a/src/test/java/org/grouphq/groupsync/group/domain/OutboxEventTest.java b/src/test/java/org/grouphq/groupsync/group/domain/OutboxEventTest.java index 182e4fe..e7a2aa4 100644 --- a/src/test/java/org/grouphq/groupsync/group/domain/OutboxEventTest.java +++ b/src/test/java/org/grouphq/groupsync/group/domain/OutboxEventTest.java @@ -4,7 +4,9 @@ import org.grouphq.groupsync.GroupTestUtility; import org.grouphq.groupsync.groupservice.domain.members.Member; +import org.grouphq.groupsync.groupservice.domain.outbox.ErrorData; import org.grouphq.groupsync.groupservice.domain.outbox.OutboxEvent; +import org.grouphq.groupsync.groupservice.domain.outbox.enums.EventStatus; import org.grouphq.groupsync.groupservice.domain.outbox.enums.EventType; import org.grouphq.groupsync.groupservice.web.objects.egress.PublicMember; import org.junit.jupiter.api.DisplayName; @@ -35,4 +37,16 @@ void convertsOutboxEventMemberEventDataForMemberLeftToPublicType() { assertThat(convertedOutboxEvent.getEventData()).isExactlyInstanceOf(PublicMember.class); } + + @Test + @DisplayName("Does not perform any conversion for failed events") + void doesNotPerformAnyConversionForFailedEvents() { + final ErrorData errorData = new ErrorData("Error message"); + final OutboxEvent outboxEvent = GroupTestUtility.generateOutboxEvent( + 1L, errorData, EventType.MEMBER_JOINED, EventStatus.FAILED); + + final OutboxEvent nonConvertedEvent = OutboxEvent.convertEventDataToPublic(outboxEvent); + + assertThat(nonConvertedEvent).isEqualTo(outboxEvent); + } } diff --git a/src/test/java/org/grouphq/groupsync/group/event/GroupEventForwarderIntegrationTest.java b/src/test/java/org/grouphq/groupsync/group/event/GroupEventForwarderIntegrationTest.java index f4e0cce..acde2cc 100644 --- a/src/test/java/org/grouphq/groupsync/group/event/GroupEventForwarderIntegrationTest.java +++ b/src/test/java/org/grouphq/groupsync/group/event/GroupEventForwarderIntegrationTest.java @@ -116,9 +116,12 @@ void forwardsPrivateMemberEventsToUserWithPublicEventDataModels() { @DisplayName("Forwards events to the outbox event update failed sink") void forwardsEventsToTheOutboxEventUpdateFailedSink() { final List outboxEvents = List.of( - GroupTestUtility.generateOutboxEvent(USER, EventStatus.FAILED), - GroupTestUtility.generateOutboxEvent(USER, EventStatus.FAILED), - GroupTestUtility.generateOutboxEvent(USER, EventStatus.FAILED) + GroupTestUtility.generateOutboxEvent(USER, EventStatus.FAILED, EventType.GROUP_CREATED), + GroupTestUtility.generateOutboxEvent(USER, EventStatus.FAILED, EventType.GROUP_UPDATED), + GroupTestUtility.generateOutboxEvent(USER, EventStatus.FAILED, EventType.GROUP_DISBANDED), + GroupTestUtility.generateOutboxEvent(USER, EventStatus.FAILED, EventType.MEMBER_JOINED), + GroupTestUtility.generateOutboxEvent(USER, EventStatus.FAILED, EventType.MEMBER_LEFT), + GroupTestUtility.generateOutboxEvent(USER, EventStatus.FAILED, EventType.NOTHING) ); final Flux groupUpdatesStream = diff --git a/src/test/java/org/grouphq/groupsync/group/event/GroupEventForwarderTest.java b/src/test/java/org/grouphq/groupsync/group/event/GroupEventForwarderTest.java index 551312b..27a2de2 100644 --- a/src/test/java/org/grouphq/groupsync/group/event/GroupEventForwarderTest.java +++ b/src/test/java/org/grouphq/groupsync/group/event/GroupEventForwarderTest.java @@ -1,6 +1,6 @@ package org.grouphq.groupsync.group.event; -import static org.mockito.BDDMockito.willDoNothing; +import static org.mockito.BDDMockito.given; import static org.mockito.Mockito.verify; import org.grouphq.groupsync.GroupTestUtility; @@ -16,6 +16,7 @@ import org.mockito.Mock; import org.mockito.junit.jupiter.MockitoExtension; import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; @Tag("UnitTest") @ExtendWith(MockitoExtension.class) @@ -35,8 +36,8 @@ void forwardsSuccessfulEventsToTheUpdatesSink() { final PublicOutboxEvent publicOutboxEvent = PublicOutboxEvent.convertOutboxEvent(outboxEvent); - willDoNothing().given(groupUpdateService).sendPublicOutboxEventToAll(publicOutboxEvent); - willDoNothing().given(groupUpdateService).sendOutboxEventToEventOwner(outboxEvent); + given(groupUpdateService.sendPublicOutboxEventToAll(publicOutboxEvent)).willReturn(Mono.empty()); + given(groupUpdateService.sendOutboxEventToEventOwner(outboxEvent)).willReturn(Mono.empty()); groupEventForwarder.processedEvents().accept(Flux.just(outboxEvent)); @@ -50,7 +51,7 @@ void forwardsFailedEventsToTheUpdatesFailedSink() { final OutboxEvent outboxEvent = GroupTestUtility.generateOutboxEvent("ID", EventStatus.FAILED); - willDoNothing().given(groupUpdateService).sendOutboxEventToEventOwner(outboxEvent); + given(groupUpdateService.sendOutboxEventToEventOwner(outboxEvent)).willReturn(Mono.empty()); groupEventForwarder.processedEvents().accept(Flux.just(outboxEvent)); diff --git a/src/test/java/org/grouphq/groupsync/group/sync/GroupUpdateServiceTest.java b/src/test/java/org/grouphq/groupsync/group/sync/GroupUpdateServiceTest.java index 823db17..6817aea 100644 --- a/src/test/java/org/grouphq/groupsync/group/sync/GroupUpdateServiceTest.java +++ b/src/test/java/org/grouphq/groupsync/group/sync/GroupUpdateServiceTest.java @@ -11,6 +11,7 @@ import org.junit.jupiter.api.Tag; import org.junit.jupiter.api.Test; import reactor.core.publisher.Flux; +import reactor.core.scheduler.Schedulers; import reactor.test.StepVerifier; @Tag("UnitTest") @@ -36,8 +37,11 @@ void updatesSinkWithNewOutboxEventsAndEmitsThem() { final PublicOutboxEvent publicOutboxEvent = PublicOutboxEvent.convertOutboxEvent(outboxEvent); - StepVerifier.create(groupUpdateService.publicUpdatesStream()) - .then(() -> groupUpdateService.sendPublicOutboxEventToAll(publicOutboxEvent)) + StepVerifier.create(groupUpdateService.publicUpdatesStream() + .publishOn(Schedulers.boundedElastic()) + .doOnSubscribe(subscription -> + groupUpdateService.sendPublicOutboxEventToAll(publicOutboxEvent).subscribe()) + ) .expectNext(publicOutboxEvent) .thenCancel() .verify(Duration.ofSeconds(1)); @@ -56,8 +60,12 @@ void updatesSinkWithFailedOutboxEventsAndEmitsThem() { final OutboxEvent outboxEvent = GroupTestUtility.generateOutboxEvent("ID", EventStatus.FAILED); - StepVerifier.create(groupUpdateService.eventOwnerUpdateStream()) - .then(() -> groupUpdateService.sendOutboxEventToEventOwner(outboxEvent)) + StepVerifier.create( + groupUpdateService.eventOwnerUpdateStream() + .publishOn(Schedulers.boundedElastic()) + .doOnSubscribe(subscription -> + groupUpdateService.sendOutboxEventToEventOwner(outboxEvent).subscribe()) + ) .expectNext(outboxEvent) .thenCancel() .verify(Duration.ofSeconds(1)); diff --git a/src/testFixtures/java/org/grouphq/groupsync/GroupTestUtility.java b/src/testFixtures/java/org/grouphq/groupsync/GroupTestUtility.java index cfaed08..b531dba 100644 --- a/src/testFixtures/java/org/grouphq/groupsync/GroupTestUtility.java +++ b/src/testFixtures/java/org/grouphq/groupsync/GroupTestUtility.java @@ -9,6 +9,7 @@ import org.grouphq.groupsync.groupservice.domain.groups.GroupStatus; import org.grouphq.groupsync.groupservice.domain.members.Member; import org.grouphq.groupsync.groupservice.domain.members.MemberStatus; +import org.grouphq.groupsync.groupservice.domain.outbox.ErrorData; import org.grouphq.groupsync.groupservice.domain.outbox.EventDataModel; import org.grouphq.groupsync.groupservice.domain.outbox.OutboxEvent; import org.grouphq.groupsync.groupservice.domain.outbox.enums.AggregateType; @@ -134,7 +135,7 @@ public static Group generateFullGroupDetailsWithMembers(GroupStatus status) { final Set members = new HashSet<>(); for (int i = 0; i < maxCapacity / 2; i++) { - final Member member = generateFullMemberDetails(faker.name().username(), groupId); + final Member member = generateFullMemberDetails(faker.name().firstName(), groupId); final PublicMember publicMember = Member.toPublicMember(member); members.add(publicMember); } @@ -435,8 +436,9 @@ public static OutboxEvent generateOutboxEvent() { } /** - * Generates an outbox event with the given EventDataModel. + * Generates an outbox event with the given parameters. * + * @param aggregateId the aggregate ID to use * @param eventData the event data to use * @param eventType the event type to use * @@ -457,11 +459,42 @@ public static OutboxEvent generateOutboxEvent( ); } + /** + * Generates an outbox event with the given parameters. + * + * @param aggregateId the aggregate ID to use + * @param eventData the event data to use + * @param eventType the event type to use + * @param eventStatus the event status to use + * + * @return an OutboxEvent object with all details. + */ + public static OutboxEvent generateOutboxEvent( + Long aggregateId, EventDataModel eventData, EventType eventType, EventStatus eventStatus) { + + return new OutboxEvent( + UUID.randomUUID(), + aggregateId, + UUID.randomUUID().toString(), + AggregateType.GROUP, + eventType, + eventData, + eventStatus, + Instant.now() + ); + } + /** * Overloaded method for {@link #generateOutboxEvent()} ()}. */ public static OutboxEvent generateOutboxEvent(String webSocketId, EventStatus eventStatus) { - final EventDataModel eventData = GroupTestUtility.generateFullGroupDetails(GroupStatus.ACTIVE); + EventDataModel eventData; + + if (eventStatus == EventStatus.FAILED) { + eventData = new ErrorData("Error message"); + } else { + eventData = GroupTestUtility.generateFullGroupDetails(GroupStatus.ACTIVE); + } return new OutboxEvent( UUID.randomUUID(), @@ -474,4 +507,33 @@ public static OutboxEvent generateOutboxEvent(String webSocketId, EventStatus ev Instant.now() ); } + + /** + * Overloaded method for {@link #generateOutboxEvent()} ()}. + */ + public static OutboxEvent generateOutboxEvent(String webSocketId, EventStatus eventStatus, EventType eventType) { + EventDataModel eventData; + + if (eventStatus == EventStatus.FAILED) { + eventData = new ErrorData("Error message"); + } else { + eventData = switch (eventType) { + case GROUP_CREATED -> GroupTestUtility.generateFullGroupDetails(GroupStatus.ACTIVE); + case GROUP_UPDATED, GROUP_DISBANDED -> GroupTestUtility.generateFullGroupDetails(GroupStatus.DISBANDED); + case MEMBER_JOINED, MEMBER_LEFT -> GroupTestUtility.generateFullMemberDetails(); + case NOTHING -> null; + }; + } + + return new OutboxEvent( + UUID.randomUUID(), + FAKER.number().randomNumber(12, true), + webSocketId, + AggregateType.GROUP, + eventType, + eventData, + eventStatus, + Instant.now() + ); + } }