Skip to content

Commit

Permalink
GROUP-89 Updated public update streams to fetch, maintain, and concat…
Browse files Browse the repository at this point in the history
… initial state
  • Loading branch information
makmn1 committed Mar 3, 2024
1 parent 63d932e commit 5780eb6
Show file tree
Hide file tree
Showing 34 changed files with 1,322 additions and 78 deletions.
4 changes: 3 additions & 1 deletion config/pmd/codestyle.xml
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@
<rule ref="category/java/codestyle.xml/MDBAndSessionBeanNamingConvention" />
<rule ref="category/java/codestyle.xml/MethodNamingConventions" />
<rule ref="category/java/codestyle.xml/NoPackage" />
<rule ref="category/java/codestyle.xml/OnlyOneReturn" />
<rule ref="category/java/codestyle.xml/PackageCase" />
<rule ref="category/java/codestyle.xml/PrematureDeclaration" />
<rule ref="category/java/codestyle.xml/RemoteInterfaceNamingConvention" />
Expand Down Expand Up @@ -88,4 +87,7 @@
<!-- Doesn't work well with Jackson @JsonIgnoreProperties(value = {"@type"}) for records / classes -->
<!-- <rule ref="category/java/codestyle.xml/UnnecessaryAnnotationValueElement" />-->


<!-- Sometimes it makes more semantic sense to return early, common for guard clauses -->
<!-- <rule ref="category/java/codestyle.xml/OnlyOneReturn" />-->
</ruleset>
6 changes: 3 additions & 3 deletions config/pmd/multithreading.xml
Original file line number Diff line number Diff line change
Expand Up @@ -11,14 +11,14 @@

<rule ref="category/java/multithreading.xml/AvoidSynchronizedAtMethodLevel" />
<rule ref="category/java/multithreading.xml/AvoidThreadGroup" />
<rule ref="category/java/multithreading.xml/AvoidUsingVolatile" />
<rule ref="category/java/multithreading.xml/DoNotUseThreads" />
<rule ref="category/java/multithreading.xml/DontCallThreadRun" />
<rule ref="category/java/multithreading.xml/DoubleCheckedLocking" />
<rule ref="category/java/multithreading.xml/NonThreadSafeSingleton" />
<rule ref="category/java/multithreading.xml/UnsynchronizedStaticFormatter" />
<rule ref="category/java/multithreading.xml/UseConcurrentHashMap" />
<rule ref="category/java/multithreading.xml/UseNotifyAllInsteadOfNotify" />

<!-- Does not allow constant, unmodifiable maps -->
<!-- <rule ref="category/java/multithreading.xml/UseConcurrentHashMap" />-->
<!-- Controversial rule -->
<!-- <rule ref="category/java/multithreading.xml/AvoidUsingVolatile" />-->
</ruleset>
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule;
import java.time.Instant;
import java.util.UUID;
import lombok.extern.slf4j.Slf4j;
import org.grouphq.groupsync.groupservice.domain.members.Member;
import org.grouphq.groupsync.groupservice.domain.outbox.EventDataModel;
Expand All @@ -23,7 +24,7 @@
* @param createdDate The date the event was created
*/
@Slf4j
public record PublicOutboxEvent(Long aggregateId, AggregateType aggregateType,
public record PublicOutboxEvent(UUID eventId, Long aggregateId, AggregateType aggregateType,
EventType eventType, EventDataModel eventData,
EventStatus eventStatus, Instant createdDate) {

Expand Down Expand Up @@ -56,6 +57,7 @@ private static PublicOutboxEvent convertMemberJoined(OutboxEvent outboxEvent) {
PublicOutboxEvent publicOutboxEvent;

publicOutboxEvent = new PublicOutboxEvent(
outboxEvent.getEventId(),
outboxEvent.getAggregateId(),
outboxEvent.getAggregateType(),
outboxEvent.getEventType(),
Expand All @@ -68,11 +70,24 @@ private static PublicOutboxEvent convertMemberJoined(OutboxEvent outboxEvent) {
}

private static PublicOutboxEvent convertMemberLeft(OutboxEvent outboxEvent) {
return convertDefault(outboxEvent);
PublicOutboxEvent publicOutboxEvent;

publicOutboxEvent = new PublicOutboxEvent(
outboxEvent.getEventId(),
outboxEvent.getAggregateId(),
outboxEvent.getAggregateType(),
outboxEvent.getEventType(),
Member.toPublicMember((Member) outboxEvent.getEventData()),
outboxEvent.getEventStatus(),
outboxEvent.getCreatedDate()
);

return publicOutboxEvent;
}

private static PublicOutboxEvent convertDefault(OutboxEvent outboxEvent) {
return new PublicOutboxEvent(
outboxEvent.getEventId(),
outboxEvent.getAggregateId(),
outboxEvent.getAggregateType(),
outboxEvent.getEventType(),
Expand All @@ -84,6 +99,7 @@ private static PublicOutboxEvent convertDefault(OutboxEvent outboxEvent) {

public PublicOutboxEvent withNewEventData(EventDataModel eventDataModel) {
return new PublicOutboxEvent(
this.eventId,
this.aggregateId(),
this.aggregateType(),
this.eventType,
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package org.grouphq.groupsync.group.sync;

import lombok.RequiredArgsConstructor;
import org.grouphq.groupsync.group.domain.PublicOutboxEvent;
import org.grouphq.groupsync.groupservice.domain.groups.Group;
import org.grouphq.groupsync.groupservice.domain.members.Member;
import org.springframework.stereotype.Service;
Expand All @@ -20,6 +21,10 @@ public Flux<Group> getGroups() {
return groupServiceClient.getGroups();
}

public Flux<PublicOutboxEvent> getGroupsAsEvents() {
return groupServiceClient.getGroupsAsEvents();
}

public Mono<Member> getMyMember(String websocketId) {
return groupServiceClient.getMyMember(websocketId);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import lombok.extern.slf4j.Slf4j;
import org.grouphq.groupsync.config.ClientProperties;
import org.grouphq.groupsync.group.domain.GroupServiceUnavailableException;
import org.grouphq.groupsync.group.domain.PublicOutboxEvent;
import org.grouphq.groupsync.groupservice.domain.groups.Group;
import org.grouphq.groupsync.groupservice.domain.members.Member;
import org.springframework.stereotype.Service;
Expand Down Expand Up @@ -39,10 +40,29 @@ public Flux<Group> getGroups() {
.retryWhen(
Retry.backoff(clientProperties.getGroupsRetryAttempts(),
Duration.ofMillis(clientProperties.getGroupsRetryBackoffMilliseconds())))
.doOnError(throwable -> log.error("Group service failed on request to get groups", throwable))
.onErrorMap(throwable -> new GroupServiceUnavailableException(
"Group service failed on request to get groups"));
}

public Flux<PublicOutboxEvent> getGroupsAsEvents() {
return webClient
.get()
.uri("/api/groups/events")
.retrieve()
.bodyToFlux(PublicOutboxEvent.class)
.timeout(Duration.ofMillis(
clientProperties.getGroupsTimeoutMilliseconds()),
Flux.error(new GroupServiceUnavailableException(
"Group service timed out on request to get groups")))
.retryWhen(
Retry.backoff(clientProperties.getGroupsRetryAttempts(),
Duration.ofMillis(clientProperties.getGroupsRetryBackoffMilliseconds())))
.doOnError(throwable -> log.error("Group service failed on request to get groups as events", throwable))
.onErrorMap(throwable -> new GroupServiceUnavailableException(
"Group service failed on request to get groups as events"));
}

public Mono<Member> getMyMember(String websocketId) {
return webClient
.get()
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
package org.grouphq.groupsync.group.sync.state;

import java.time.Duration;
import java.util.concurrent.atomic.AtomicReference;
import lombok.extern.slf4j.Slf4j;
import org.grouphq.groupsync.config.ClientProperties;
import reactor.core.publisher.Mono;
import reactor.core.publisher.SignalType;

/**
* State representing when {@link GroupInitialStateService} is waiting for a client to trigger the initialization.
*/
@Slf4j
public class DormantState extends State {

private final ClientProperties clientProperties;

private final AtomicReference<Mono<Void>> initialRequest = new AtomicReference<>();

public DormantState(GroupInitialStateService groupInitialStateService, ClientProperties clientProperties) {
super(groupInitialStateService);
this.clientProperties = clientProperties;
}

/**
* Triggers the initialization of the current state of groups and their members.
* Note that this method will only trigger the initialization once, subsequent calls will return the same request.
* This is done by caching the mono, making it a hot source.
*
* @return a Mono that will initialize the current state of groups and their members
*/
@Override
public Mono<Void> onRequest() {
initialRequest.compareAndSet(null,
groupInitialStateService.initializeGroupState()
.timeout(Duration.ofMillis(clientProperties.getGroupsTimeoutMilliseconds()))
.doOnError(throwable -> log.error("Error initializing group state", throwable))
.doOnSuccess(empty -> log.info("Successfully initialized group state"))
.doFinally(signalType -> {
if (signalType == SignalType.ON_COMPLETE) {
groupInitialStateService.setState(new ReadyState(groupInitialStateService));
} else if (signalType == SignalType.ON_ERROR) {
groupInitialStateService.setState(
new DormantState(groupInitialStateService, clientProperties));
}
}).cache()
);

final Mono<Void> cachedRequest = initialRequest.get();

groupInitialStateService.setState(new LoadingState(groupInitialStateService, cachedRequest));
return cachedRequest;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
package org.grouphq.groupsync.group.sync.state;

import jakarta.annotation.PreDestroy;
import java.time.Duration;
import java.util.HashSet;
import lombok.AccessLevel;
import lombok.Getter;
import lombok.Setter;
import lombok.extern.slf4j.Slf4j;
import org.grouphq.groupsync.config.ClientProperties;
import org.grouphq.groupsync.group.domain.PublicOutboxEvent;
import org.grouphq.groupsync.group.sync.GroupFetchService;
import org.grouphq.groupsync.group.sync.GroupUpdateService;
import org.grouphq.groupsync.groupservice.domain.groups.Group;
import org.springframework.stereotype.Service;
import reactor.core.Disposable;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Schedulers;
import reactor.util.retry.Retry;

/**
* Maintains the current relevant events to enable clients
* to build the current state of groups and their members.
* Relies on Java's Standard Library Concurrency Utilities
*/
@Service
@Slf4j
public class GroupInitialStateService {

private final GroupFetchService groupFetchService;
private final GroupUpdateService groupUpdateService;
private final GroupStateService groupStateService;
private final ClientProperties clientProperties;

@Getter
@Setter(AccessLevel.PACKAGE)
private volatile State state;

@Getter(AccessLevel.NONE)
@Setter(AccessLevel.NONE)
private transient Disposable updateSubscription;


public GroupInitialStateService(GroupFetchService groupFetchService, GroupUpdateService groupUpdateService,
GroupStateService groupStateService, ClientProperties clientProperties) {
this.groupFetchService = groupFetchService;
this.groupUpdateService = groupUpdateService;
this.groupStateService = groupStateService;
this.clientProperties = clientProperties;
setState(new DormantState(this, clientProperties));
}

public Flux<PublicOutboxEvent> requestCurrentEvents() {
return state.onRequest()
.thenMany(buildState());
}

private Flux<PublicOutboxEvent> buildState() {
return groupStateService.getCurrentGroupEvents()
.filter(event -> event.eventData() instanceof Group)
.flatMap(event -> {
final Group group = (Group) event.eventData();
return groupStateService.getMembersForGroup(group.id())
.collectList()
.map(publicMembers -> {
final Group groupWithMembers = group.withMembers(new HashSet<>(publicMembers));
return event.withNewEventData(groupWithMembers);
});
});
}

protected Mono<Void> initializeGroupState() {
return groupStateService.resetState().then(
groupFetchService.getGroupsAsEvents()
.flatMap(groupStateService::saveGroupEvent)
.retryWhen(
Retry.backoff(clientProperties.getGroupsRetryAttempts(),
Duration.ofMillis(clientProperties.getGroupsRetryBackoffMilliseconds()))
.maxBackoff(Duration.ofSeconds(10))
.doBeforeRetry(retrySignal -> log.warn("Retrying due to error", retrySignal.failure())))
.doOnComplete(this::createUpdateSubscription)
.doOnError(error -> log.error("Error getting initial state of events", error))
.then(Mono.empty())
);
}

private void createUpdateSubscription() {
disposeUpdateSubscription();

updateSubscription = keepGroupsAndMembersUpToDate()
.subscribeOn(Schedulers.boundedElastic())
.subscribe();
}

private void disposeUpdateSubscription() {
if (updateSubscription != null && !updateSubscription.isDisposed()) {
updateSubscription.dispose();
}
}

/**
* Methods annotated with PreDestroy are called by the Spring framework before destroying the service bean.
*/
@PreDestroy
public void cleanUp() {
disposeUpdateSubscription();
}

protected Flux<PublicOutboxEvent> keepGroupsAndMembersUpToDate() {
return groupUpdateService.publicUpdatesStream()
.flatMap(groupStateService::handleEventUpdate)
.doOnError(throwable -> log.error("Error keeping groups and members up to date", throwable));
}


}
Loading

0 comments on commit 5780eb6

Please sign in to comment.