My Profile Photo

Andreas Peters


CEO of AVENTER UG (haftungsbeschränkt)
and MÙLÍN UG (haftungsbeschränkt)


Kafka Einfuehrung

In diesem Post moechte ich eine kleine Einfuehrung in Kafka geben und wie dieses mit Golang zu bedienen ist.

Kafka Tutorial

Kafka Kategorien.

Kafka beinhaltet 5 Kategorien der Verarbeitung. Diese werden nachfolgend kurz beschrieben.

  • Topic: Eine Topic ist ein Feed in dem alle messages veröffentlich werden die vom message producer stammen. Die Topics sind in Partitionen aufgeteilt. Jede Partition repräsentiert eine unveränderliche Sequenz von messages. Die Partitionen werden von Kafka verwaltet. Jede Message hat eine eindeutige offset Nummer innerhalb der Partition.
  • Broker: Ein Kafka Cluster besteht aus einem oder mehreren Servern auf welchen jeweils einen oder mehrere Server Prozesse laufen. Diese Server Prozesse werden in Kafka Broker genannt. Die oben beschriebene Topics werden jeweils im Kontext eines Brokers erstellt.
  • Zookeeper: Zookeeper coordiniert die ablauf zwischen den Kafka Broker und dem consumer. Details bitte der Zookeeper Doku entnaehmen. http://wiki.apache.org/hadoop/ZooKeeper/ProjectDescription Zookeeper erlaubt es, dass verteilte Prozesse sich unter einander koordinieren können. Dies erfolgt über einen hierarchischen Name Space, ähnlich einem Filesystem aufgebaut. Gespeichert werden: Status Information, Configuration, Location information, und einiges mehr.
  • Producers: Producers veröffentlichen Daten zu den Topics. Um die Last zu verteilen, erfolgt die Auswahl der Partition innerhalb eines Topics über RoundRobin.
  • Consumer: Consumers sind Applikationen oder Prozesse welche die Topics subscribes haben und dessen messages bearbeiten.

Starten einer Single Node Instance

Jede Kafka Installation benötigt einen Zookeeper. Den starten wir als ersten.

docker run --name zookeeper -p 2181:2181 -d zookeeper
docker logs -f zookeeper
ZooKeeper JMX enabled by default
Using config: /conf/zoo.cfg
2018-02-09 10:08:00,958 [myid:] - INFO  [main:QuorumPeerConfig@136] - Reading configuration from: /conf/zoo.cfg
2018-02-09 10:08:00,963 [myid:] - INFO  [main:DatadirCleanupManager@78] - autopurge.snapRetainCount set to 3
2018-02-09 10:08:00,963 [myid:] - INFO  [main:DatadirCleanupManager@79] - autopurge.purgeInterval set to 0
2018-02-09 10:08:00,963 [myid:] - INFO  [main:DatadirCleanupManager@101] - Purge task is not scheduled.
2018-02-09 10:08:00,964 [myid:] - WARN  [main:QuorumPeerMain@116] - Either no config or no quorum defined in config, running  in standalone mode
2018-02-09 10:08:00,976 [myid:] - INFO  [main:QuorumPeerConfig@136] - Reading configuration from: /conf/zoo.cfg
2018-02-09 10:08:00,976 [myid:] - INFO  [main:ZooKeeperServerMain@98] - Starting server
2018-02-09 10:08:00,983 [myid:] - INFO  [main:Environment@100] - Server environment:zookeeper.version=3.4.11-37e277162d567b55a07d1755f0b31c32e93c01a0, built on 11/01/2017 18:06 GMT
2018-02-09 10:08:00,984 [myid:] - INFO  [main:Environment@100] - Server environment:host.name=2234626d2795
2018-02-09 10:08:00,984 [myid:] - INFO  [main:Environment@100] - Server environment:java.version=1.8.0_151
2018-02-09 10:08:00,984 [myid:] - INFO  [main:Environment@100] - Server environment:java.vendor=Oracle Corporation
2018-02-09 10:08:00,984 [myid:] - INFO  [main:Environment@100] - Server environment:java.home=/usr/lib/jvm/java-1.8-openjdk/jre
2018-02-09 10:08:00,984 [myid:] - INFO  [main:Environment@100] - Server environment:java.class.path=/zookeeper-3.4.11/bin/../build/classes:/zookeeper-3.4.11/bin/../build/lib/*.jar:/zookeeper-3.4.11/bin/../lib/slf4j-log4j12-1.6.1.jar:/zookeeper-3.4.11/bin/../lib/slf4j-api-1.6.1.jar:/zookeeper-3.4.11/bin/../lib/netty-3.10.5.Final.jar:/zookeeper-3.4.11/bin/../lib/log4j-1.2.16.jar:/zookeeper-3.4.11/bin/../lib/jline-0.9.94.jar:/zookeeper-3.4.11/bin/../lib/audience-annotations-0.5.0.jar:/zookeeper-3.4.11/bin/../zookeeper-3.4.11.jar:/zookeeper-3.4.11/bin/../src/java/lib/*.jar:/conf:
2018-02-09 10:08:00,984 [myid:] - INFO  [main:Environment@100] - Server environment:java.library.path=/usr/lib/jvm/java-1.8-openjdk/jre/lib/amd64/server:/usr/lib/jvm/java-1.8-openjdk/jre/lib/amd64:/usr/lib/jvm/java-1.8-openjdk/jre/../lib/amd64:/usr/java/packages/lib/amd64:/usr/lib64:/lib64:/lib:/usr/lib
2018-02-09 10:08:00,984 [myid:] - INFO  [main:Environment@100] - Server environment:java.io.tmpdir=/tmp
2018-02-09 10:08:00,984 [myid:] - INFO  [main:Environment@100] - Server environment:java.compiler=<NA>
2018-02-09 10:08:00,985 [myid:] - INFO  [main:Environment@100] - Server environment:os.name=Linux
2018-02-09 10:08:00,986 [myid:] - INFO  [main:Environment@100] - Server environment:os.arch=amd64
2018-02-09 10:08:00,986 [myid:] - INFO  [main:Environment@100] - Server environment:os.version=4.4.114-1-MANJARO
2018-02-09 10:08:00,986 [myid:] - INFO  [main:Environment@100] - Server environment:user.name=zookeeper
2018-02-09 10:08:00,986 [myid:] - INFO  [main:Environment@100] - Server environment:user.home=/home/zookeeper
2018-02-09 10:08:00,987 [myid:] - INFO  [main:Environment@100] - Server environment:user.dir=/zookeeper-3.4.11
2018-02-09 10:08:00,994 [myid:] - INFO  [main:ZooKeeperServer@825] - tickTime set to 2000
2018-02-09 10:08:00,994 [myid:] - INFO  [main:ZooKeeperServer@834] - minSessionTimeout set to -1
2018-02-09 10:08:00,994 [myid:] - INFO  [main:ZooKeeperServer@843] - maxSessionTimeout set to -1
2018-02-09 10:08:01,003 [myid:] - INFO  [main:ServerCnxnFactory@117] - Using org.apache.zookeeper.server.NIOServerCnxnFactory as server connection factory
2018-02-09 10:08:01,008 [myid:] - INFO  [main:NIOServerCnxnFactory@89] - binding to port 0.0.0.0/0.0.0.0:2181

Als nächste starten wir Kafka und linken ihn im Single Node Fall mit zookeeper. In einer Multinode Umgebung kann man dem Docker Image die entsprechenden Parameter übergeben.

docker run --link zookeeper:zookeeper -e ZOOKEEPER=zookeeper:2181 --name kafka avhost/docker-kafka
.
.
.
[2018-02-09 10:27:41,101] INFO Result of znode creation is: OK (kafka.utils.ZKCheckedEphemeral)
[2018-02-09 10:27:41,102] INFO Registered broker 0 at path /brokers/ids/0 with addresses: EndPoint(84239d549948,9092,ListenerName(PLAINTEXT),PLAINTEXT) (kafka.utils.ZkUtils)
[2018-02-09 10:27:41,104] WARN No meta.properties file under dir /tmp/kafka-logs/meta.properties (kafka.server.BrokerMetadataCheckpoint)
[2018-02-09 10:27:41,161] INFO Kafka version : 0.11.0.2 (org.apache.kafka.common.utils.AppInfoParser)
[2018-02-09 10:27:41,161] INFO Kafka commitId : 73be1e1168f91ee2 (org.apache.kafka.common.utils.AppInfoParser)
[2018-02-09 10:27:41,163] INFO [Kafka Server 0], started (kafka.server.KafkaServer)

Testen der Umgebung

Erstellen eins Topics

docker exec -it kafka /kafka/bin/kafka-topics.sh --create --zookeeper localhost:2181 -replication-factor 1 --partitions 1 --topic kafkatopic 
Created topic "kafkatopic".

Senden einer Message

docker exec -it kafka /kafka/bin/kafka-console-producer.sh --broker-list localhost:9092 --topic kafkatopic
>test
>test:1
>blub
>welcome
>

Lesen eines Message feeds

docker exec -it kafka /kafka/bin/kafka-console-consumer.sh --zookeeper zookeeper:2181 --topic kafkatopic --from-beginning
Using the ConsoleConsumer with old consumer is deprecated and will be removed in a future major release. Consider using the new consumer by passing [bootstrap-server] instead of [zookeeper].



help
test
test:1
blub
welcome

Kafka feed mit Golang lesen

Details zum Golang Package sind hier zu finden. https://github.com/confluentinc/confluent-kafka-go

Das folgende Beispielcode zeigt, wie man einen Kafka feed mithilfe von Go abonniert.

// Example channel-based high-level Apache Kafka consumer
package main

/**
 * Copyright 2016 Confluent Inc.
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 * http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

import (
	"fmt"
	"github.com/confluentinc/confluent-kafka-go/kafka"
	"os"
	"os/signal"
	"syscall"
)

func main() {

	if len(os.Args) < 4 {
		fmt.Fprintf(os.Stderr, "Usage: %s <broker> <group> <topics..>\n",
			os.Args[0])
		os.Exit(1)
	}

	broker := os.Args[1]
	group := os.Args[2]
	topics := os.Args[3:]

	sigchan := make(chan os.Signal, 1)
	signal.Notify(sigchan, syscall.SIGINT, syscall.SIGTERM)

	c, err := kafka.NewConsumer(&kafka.ConfigMap{
		"bootstrap.servers":               broker,
		"group.id":                        group,
		"session.timeout.ms":              6000,
		"go.events.channel.enable":        true,
		"go.application.rebalance.enable": true,
		"default.topic.config":            kafka.ConfigMap{"auto.offset.reset": "earliest"}})

	if err != nil {
		fmt.Fprintf(os.Stderr, "Failed to create consumer: %s\n", err)
		os.Exit(1)
	}

	fmt.Printf("Created Consumer %v\n", c)

	err = c.SubscribeTopics(topics, nil)

	run := true

	for run == true {
		select {
		case sig := <-sigchan:
			fmt.Printf("Caught signal %v: terminating\n", sig)
			run = false

		case ev := <-c.Events():
			switch e := ev.(type) {
			case kafka.AssignedPartitions:
				fmt.Fprintf(os.Stderr, "%% %v\n", e)
				c.Assign(e.Partitions)
			case kafka.RevokedPartitions:
				fmt.Fprintf(os.Stderr, "%% %v\n", e)
				c.Unassign()
			case *kafka.Message:
				fmt.Printf("%% Message on %s:\n%s\n",
					e.TopicPartition, string(e.Value))
			case kafka.PartitionEOF:
				fmt.Printf("%% Reached %v\n", e)
			case kafka.Error:
				fmt.Fprintf(os.Stderr, "%% Error: %v\n", e)
				run = false
			}
		}
	}

	fmt.Printf("Closing consumer\n")
	c.Close()
}

Im Gegensatz zum Beispiel im vorherigen Abschnitt, greifen wir hier nicht auf den zookeeper, sondern direkt auf den Broker zu. Der Zugriff ueber Zookeeper ist depricated.

go run golang_consumer.go localhost:9092 test-consumer-group kafkatopic

Hier wird es zu einer Fehlermeldung kommen. Eine nicht aufzuloesende Docker ID wird ausgeworfen. Diese einfach in die /etc/hosts mit der IP von localhost eintragen. Dann funktioniert es! Die Gruppe test-consumer-group ist eine Kafka default Gruppe.

Starten einer Single Node aber multible Broker Umgebung

Zunaechst muss ein Zookeeper gestartet werden.

docker run --name zookeeper -p 2181:2181 -d zookeeper

Dann koennen wir zwei Broker starten. Wichtig ist, die Broker ID muss sich aendern und da wir alles auf einem Server starten auch der Port.

Broker 1

docker run --link zookeeper:zookeeper --env ZOOKEEPER=zookeeper:2181 --env BROKER_ID=1 --name kafka1 -p 9092:9092 avhost/docker-kafka:latest

Broker 2

docker run --link zookeeper:zookeeper --env ZOOKEEPER=zookeeper:2181 --env BROKER_ID=2 --name kafka2 -p 9093:9092 avhost/docker-kafka:latest

Broker 3

docker run --link zookeeper:zookeeper --env ZOOKEEPER=zookeeper:2181 --env BROKER_ID=3 --name kafka3 -p 9094:9092 avhost/docker-kafka:latest

Erstellen eines Topics

Das erstellen eines Topics in einer Multibroker Umgebung ist nicht viel anders als in einer Singlenode Installation. Man gibt lediglich den Replications Faktor und die Partitionen an.

docker exec -it kafka /kafka/bin/kafka-topics.sh --create --zookeeper zookeeper:2181 --replication-factor 3 --partitions 1 --topic replicated-kafkatopic

Senden einer Message

Beim Senden einer Message mit mehreren Brokern, muss man diese als Liste mit uebergeben. Da wir in diesem Beispiel alles auf einem SingeNode haben, aber im Docker Container, geben wir die IP Adresse des Docker Interfaces an.

docker exec -it kafka /kafka/bin/kafka-console-producer.sh --broker-list 172.17.0.1:9092,172.17.0.1:9093,172.17.0.1:9094 --topic replicated-kafkatopic