Skip to content

cseguinlz/kafka-extension

 
 

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 

History

16 Commits
 
 
 
 
 
 
 
 
 
 

Repository files navigation

Build Status remote dev?label=chat&logo=gitter&color=orange PRs welcome green

KAFKA-EXTENSION

WHAT IS THIS PROJECT ABOUT

This is a dependency which provides some functionality to read Zipkin Span list as Opentracing implementation of metrics. Messages with metrics content are read from Kafka topics, these can be serialized and de-serialized to make an abstraction from common work across the requirement of this feature in our projects.

Exposing metrics to be treated by tools let us have a knowledge about how our system works, the traffic which is supporting, when they have some weaknesses or when definitely system breaks. We are plenty of those tools, such as Zipkin UI, Prometheus, etc…​ they all provide a good bunch of views to monitor our service metrics, but we might custom something appropriated for our tailored solutions.

The purpose in here is abstract the implementation to gather the correlated spans in a linked list which gives us the next span across the multiple services involved in the request, as well as the client which did the request to next server go gather durations and details of how the connection to a third party was.

At the moment, implementation has been done focused in the Http and RPC metrics with the conventions in next link.

More work and test is needed to rely in this idea.

WHY THIS NEW DEPENDENCY AND WHAT IT CONTAINS

Working in observability feature in conjunction with Spring Boot project, which comes with micrometer as metrics provider which lets push them into a kafka topic to be consumed by tools to monitor the traffic with the Opentracing standard, such as Zipkin, Jaeger, OpenCensus, etc.

The convention for Opentracing can be found in:

As it says, it is just convention, but we need also implementation to get the information and deal with their content, in our Java world there is a well known Open Source project which works quite well with this convention and has evolved along the different versions.

and its Github page as:

From the Zipkin project we will get the Span definition as well as the serializer and deserializer with the Span detector in case the metrics definition is in any of the known message types (JSON_V1, JSON_V2, THRIFT, PROTO3).

Putting more tech on the table and given the nature of the project, Kafka in the transport channel which brings us the messages to be treated firstly by Zipkin deserializer somehow.

Dealing with the messages from Kafka topic

The way Kafka deal the message to be parsed into a format know by our application is through Serdes (SErializerDESerializer), some of them provided by Kafka and some of them implemented by third parties, like the one we have implemented in here. One of them to stand out is from Spring Project which reads those messages as json and if this is enough for you, I encourage you to use it rather creating a new one.

In our particular case, the Span doesn’t meet with those requirements to convert into a json object, so we have implemented our own.

com.currofy.kafka.extensions.opentracing.serialization.OpentracingSerde

With the message read from topic and given the type of content-type as application/opentracing it will deserialize the messages as NodeTrace which is a bunch of Zipkin Span and the common traceId which all the spans should have.

Linking the spans

If we are working with Kafka itself we have to build the pipeline to workout the linked spans, but with kafka streams we are able to group information, transform…​ essentially operate with the functionality given by this library. So, now on we suppose we are with KAFKA STREAMS.

A request is solved (or should be solved) in an amount of time (hopefully short). Once we are starting receiving the metrics from the different tasks involved in the request we want to group them in a list of span with same traceId and if possible when the events are emitted we can have them all to reconstruct the followed path.

This work to have the linked spans is an aggregator, which gets the spans and link them to follow that path.

com.currofy.kafka.extensions.group.aggregator.NodeTraceAggregator

The result will be a LinkedSpanList containing all the LinkedSpan and from which we can get the root or initial Span ( the one without parentId ). This LinkedSpan does not contain now all the information in the Zipkin Span, but something essential (this needs to be revisited, What is essential?) in order to deal with smaller objects in the new messages which go to kafka topics.

These list of LinkedSpan can be now used as java.util.stream.Stream from which we can filter within the attributes in the object.

HOW TO USE IT

This dependency is also a Spring Started, which provides MessageConverter for the Opentracing messages.

POM.XML

So, by importing the dependency and the required one from spring-cloud-stream which creates the StreamMessageConverter in the Spring context.

if it is used in a Spring project with Spring-Cloud:

    <dependency>
        <groupId>org.springframework.cloud</groupId>
        <artifactId>spring-cloud-stream</artifactId>
        <version>${spring-cloud-stream.version}</version>
    </dependency>
    <dependency>
        <groupId>com.currofy.kafka.extensions</groupId>
        <artifactId>kafka-extension</artifactId>
        <version>1.0-SNAPSHOT</version>
    </dependency>
Note
At the moment this dependency is not upload to any repository, so as to use it, you have to clone in your host and build it to be included into your local repository.

Configuring your application.properties

We are taking for granted you have a kafka topic from which your application is reading the metrics. In our case we are naming it on app_metrics:

The reference guide to configure kafka-streams-binder within Spring Cloud application can be found at:

but we have some particular needs to bring the messages to our converters, our configuration properties would look like something similar to this:

spring:
  cloud:
    stream:
      bindings:
        metrics_in:
          destination: app_metrics
          content-type: application/opentracing
          group: metrics
      kafka:
        streams:
          binder:
            brokers: localhost:9092
          bindings:
            metrics_in:
              consumer:
                application-id: metrics_in-1
                key-serde: org.apache.kafka.common.serialization.Serdes$StringSerde
                value-serde: com.currofy.kafka.extensions.opentracing.serialization.OpentracingSerde

By reading the configuration yaml file, we can see that the key in the topic (unused) is a string, and the value needs to be de-serialized with our implementation.

Bare in mind the content-type which is one of the keys to make it redirect the messages to our Serde.

Coding a bit

From Spring Cloud we can enable the bindings with @EnableBinding in the configuration file. We might give an example such as:

public interface MetricBinding {

    String METRICS_IN = "metrics_in";

    @Input(METRICS_IN)
    KStream<String, NodeTrace> metrics_in();

}

with configuration class as

@Configuration
@EnableBinding(MetricBinding.class)
public class BindingConfig {

}

This is linking the topics with the ones given in our application.yaml file.

Once we have this burden configured is time to build what really matters, your logic.

As an example of method to read, group and convert those methods we might think in an implementation as:

@Slf4j
@Service
class MetricConsumerService {

    @StreamListener
    public void metrics(@Input(MetricBinding.METRICS_IN) final KStream<String, NodeTrace> stream) {
        stream.groupBy((key, value) -> value.getTraceId(), Grouped.with(Serdes.String(), new OpentracingSerde()))
                .windowedBy(TimeWindows.of(Duration.ofSeconds(20L)))
                .aggregate(
                        LinkedSpanList::new,
                        new NodeTraceAggregator(),
                        Materialized.with(Serdes.String(), new LinkedSpanListSerde())
                )
                .toStream()
                .foreach((key, v) -> {
                            log.info("{}", v.httpTraces()
                                    .filter(s -> s.getKind().equals(Span.Kind.SERVER))
                                    .map(LinkedSpan::getId).collect(Collectors.joining(",")));
                        }
                );
    }
}

NOTES

I have written a first approach of this dependency which suits with Spring Cloud 2.x, as Spring Cloud 3.x is being released I know some of this functionality is going to be deprecated, such as the StreamMessageConverter.

Also, I am using as example @StreamListener which is discouraged to be used by Pivotal, in favour of functional programming with interfaces Supplier, Function and Consumer.

About

No description, website, or topics provided.

Resources

Stars

Watchers

Forks

Releases

No releases published

Packages

No packages published

Languages

  • Java 100.0%