Kafurn in Kubernetes – 将协调员标记为团体死亡

我对Kubernetes很新,并想用它来设置Kafka和zookeeper。 我能够使用StatefulSets在Kubernetes中设置Apache Kafka和Zookeeper。 我按照这个来构建我的清单文件。 我制作了1张kafka和zookeeper的复制品,并且还使用了持久卷。 所有pod都在运行并准备就绪。

我尝试通过指定nodePort(30010)来公开kafka并使用Service 。 看起来这会将kafka暴露给外界,在那里他们可以向kafka经纪人发送消息并从中消费。

但是在我的Java应用程序中,我创建了一个使用者并将bootstrapServer添加为:30010 ,显示了以下日志:

 INFO oakcciAbstractCoordinator - Discovered coordinator kafka-0.kafka-hs.default.svc.cluster.local:9093 (id: 2147483647 rack: null) for group workerListener. INFO oakcciAbstractCoordinator - Marking the coordinator kafka-0.kafka-hs.default.svc.cluster.local:9093 (id: 2147483647 rack: null) dead for group workerListener 

有趣的是,当我使用kubectl命令测试集群时,我能够生成和使用消息:

 kubectl run -ti --image=gcr.io/google_containers/kubernetes-kafka:1.0-10.2.1 produce --restart=Never --rm \ -- kafka-console-producer.sh --topic test --broker-list kafka-0.kafka-hs.default.svc.cluster.local:9093 done; kubectl run -ti --image=gcr.io/google_containers/kubernetes-kafka:1.0-10.2.1 consume --restart=Never --rm -- kafka-console-consumer.sh --topic test --bootstrap-server kafka-0.kafka-hs.default.svc.cluster.local:9093 

有人能指出我正确的方向,为什么它将协调员标记为死亡?

kafka.yml

 apiVersion: v1 kind: Service metadata: name: kafka-hs labels: app: kafka spec: ports: - port: 9093 name: server clusterIP: None selector: app: kafka --- apiVersion: v1 kind: Service metadata: name: kafka-cs labels: app: kafka spec: type: NodePort ports: - port: 9093 nodePort: 30010 protocol: TCP selector: app: kafka --- apiVersion: apps/v1beta1 kind: StatefulSet metadata: name: kafka spec: serviceName: kafka-hs replicas: 1 podManagementPolicy: Parallel updateStrategy: type: RollingUpdate template: metadata: labels: app: kafka spec: affinity: podAntiAffinity: requiredDuringSchedulingIgnoredDuringExecution: - labelSelector: matchExpressions: - key: "app" operator: In values: - kafka topologyKey: "kubernetes.io/hostname" podAffinity: preferredDuringSchedulingIgnoredDuringExecution: - weight: 1 podAffinityTerm: labelSelector: matchExpressions: - key: "app" operator: In values: - zk topologyKey: "kubernetes.io/hostname" terminationGracePeriodSeconds: 300 containers: - name: k8skafka imagePullPolicy: Always image: gcr.io/google_containers/kubernetes-kafka:1.0-10.2.1 resources: requests: memory: "1Gi" cpu: "0.5" ports: - containerPort: 9093 name: server command: - sh - -c - "exec kafka-server-start.sh /opt/kafka/config/server.properties --override broker.id=${HOSTNAME##*-} \ --override listeners=PLAINTEXT://:9093 \ --override zookeeper.connect=zk-cs.default.svc.cluster.local:2181 \ --override log.dir=/var/lib/kafka \ --override auto.create.topics.enable=true \ --override auto.leader.rebalance.enable=true \ --override background.threads=10 \ --override compression.type=producer \ --override delete.topic.enable=false \ --override leader.imbalance.check.interval.seconds=300 \ --override leader.imbalance.per.broker.percentage=10 \ --override log.flush.interval.messages=9223372036854775807 \ --override log.flush.offset.checkpoint.interval.ms=60000 \ --override log.flush.scheduler.interval.ms=9223372036854775807 \ --override log.retention.bytes=-1 \ --override log.retention.hours=168 \ --override log.roll.hours=168 \ --override log.roll.jitter.hours=0 \ --override log.segment.bytes=1073741824 \ --override log.segment.delete.delay.ms=60000 \ --override message.max.bytes=1000012 \ --override min.insync.replicas=1 \ --override num.io.threads=8 \ --override num.network.threads=3 \ --override num.recovery.threads.per.data.dir=1 \ --override num.replica.fetchers=1 \ --override offset.metadata.max.bytes=4096 \ --override offsets.commit.required.acks=-1 \ --override offsets.commit.timeout.ms=5000 \ --override offsets.load.buffer.size=5242880 \ --override offsets.retention.check.interval.ms=600000 \ --override offsets.retention.minutes=1440 \ --override offsets.topic.compression.codec=0 \ --override offsets.topic.num.partitions=50 \ --override offsets.topic.replication.factor=3 \ --override offsets.topic.segment.bytes=104857600 \ --override queued.max.requests=500 \ --override quota.consumer.default=9223372036854775807 \ --override quota.producer.default=9223372036854775807 \ --override replica.fetch.min.bytes=1 \ --override replica.fetch.wait.max.ms=500 \ --override replica.high.watermark.checkpoint.interval.ms=5000 \ --override replica.lag.time.max.ms=10000 \ --override replica.socket.receive.buffer.bytes=65536 \ --override replica.socket.timeout.ms=30000 \ --override request.timeout.ms=30000 \ --override socket.receive.buffer.bytes=102400 \ --override socket.request.max.bytes=104857600 \ --override socket.send.buffer.bytes=102400 \ --override unclean.leader.election.enable=true \ --override zookeeper.session.timeout.ms=6000 \ --override zookeeper.set.acl=false \ --override broker.id.generation.enable=true \ --override connections.max.idle.ms=600000 \ --override controlled.shutdown.enable=true \ --override controlled.shutdown.max.retries=3 \ --override controlled.shutdown.retry.backoff.ms=5000 \ --override controller.socket.timeout.ms=30000 \ --override default.replication.factor=1 \ --override fetch.purgatory.purge.interval.requests=1000 \ --override group.max.session.timeout.ms=300000 \ --override group.min.session.timeout.ms=6000 \ --override inter.broker.protocol.version=0.10.2-IV0 \ --override log.cleaner.backoff.ms=15000 \ --override log.cleaner.dedupe.buffer.size=134217728 \ --override log.cleaner.delete.retention.ms=86400000 \ --override log.cleaner.enable=true \ --override log.cleaner.io.buffer.load.factor=0.9 \ --override log.cleaner.io.buffer.size=524288 \ --override log.cleaner.io.max.bytes.per.second=1.7976931348623157E308 \ --override log.cleaner.min.cleanable.ratio=0.5 \ --override log.cleaner.min.compaction.lag.ms=0 \ --override log.cleaner.threads=1 \ --override log.cleanup.policy=delete \ --override log.index.interval.bytes=4096 \ --override log.index.size.max.bytes=10485760 \ --override log.message.timestamp.difference.max.ms=9223372036854775807 \ --override log.message.timestamp.type=CreateTime \ --override log.preallocate=false \ --override log.retention.check.interval.ms=300000 \ --override max.connections.per.ip=2147483647 \ --override num.partitions=1 \ --override producer.purgatory.purge.interval.requests=1000 \ --override replica.fetch.backoff.ms=1000 \ --override replica.fetch.max.bytes=1048576 \ --override replica.fetch.response.max.bytes=10485760 \ --override reserved.broker.max.id=1000 " env: - name: KAFKA_HEAP_OPTS value : "-Xmx512M -Xms512M" - name: KAFKA_OPTS value: "-Dlogging.level=INFO" volumeMounts: - name: kafka-pv-volume mountPath: /var/lib/kafka readinessProbe: exec: command: - sh - -c - "/opt/kafka/bin/kafka-broker-api-versions.sh --bootstrap-server=localhost:9093" securityContext: runAsUser: 0 fsGroup: 1000 volumeClaimTemplates: - metadata: name: kafka-pv-volume spec: storageClassName: manual accessModes: [ "ReadWriteOnce" ] resources: requests: storage: 1Gi 

zookeeper.yml

 apiVersion: v1 kind: Service metadata: name: zk-hs labels: app: zk spec: ports: - port: 2888 name: server - port: 3888 name: leader-election clusterIP: None selector: app: zk --- apiVersion: v1 kind: Service metadata: name: zk-cs labels: app: zk spec: ports: - port: 2181 name: client selector: app: zk --- apiVersion: apps/v1 kind: StatefulSet metadata: name: zk spec: selector: matchLabels: app: zk serviceName: zk-hs replicas: 1 updateStrategy: type: RollingUpdate podManagementPolicy: Parallel template: metadata: labels: app: zk spec: affinity: podAntiAffinity: requiredDuringSchedulingIgnoredDuringExecution: - labelSelector: matchExpressions: - key: "app" operator: In values: - zk topologyKey: "kubernetes.io/hostname" containers: - name: kubernetes-zookeeper imagePullPolicy: Always image: "k8s.gcr.io/kubernetes-zookeeper:1.0-3.4.10" resources: requests: memory: "1Gi" cpu: "0.5" ports: - containerPort: 2181 name: client - containerPort: 2888 name: server - containerPort: 3888 name: leader-election command: - sh - -c - "start-zookeeper \ --servers=1 \ --data_dir=/var/lib/zookeeper/data \ --data_log_dir=/var/lib/zookeeper/data/log \ --conf_dir=/opt/zookeeper/conf \ --client_port=2181 \ --election_port=3888 \ --server_port=2888 \ --tick_time=2000 \ --init_limit=10 \ --sync_limit=5 \ --heap=512M \ --max_client_cnxns=60 \ --snap_retain_count=3 \ --purge_interval=12 \ --max_session_timeout=40000 \ --min_session_timeout=4000 \ --log_level=INFO" readinessProbe: exec: command: - sh - -c - "zookeeper-ready 2181" initialDelaySeconds: 10 timeoutSeconds: 5 livenessProbe: exec: command: - sh - -c - "zookeeper-ready 2181" initialDelaySeconds: 10 timeoutSeconds: 5 volumeMounts: - name: pv-volume mountPath: /var/lib/zookeeper securityContext: runAsUser: 0 fsGroup: 1000 volumeClaimTemplates: - metadata: name: pv-volume spec: storageClassName: manual accessModes: [ "ReadWriteOnce" ] resources: requests: storage: 1Gi 

编辑:

我将日志级别更改为TRACE。 这些是我得到的日志

 2018-01-11 18:56:24,617 TRACE oakcNetworkClient - Completed receive from node -1, for key 3, received {brokers=[{node_id=0,host=kafka-0.kafka-hs.default.svc.cluster.local,port=9093,rack=null}],cluster_id=LwSLmJpTQf6tSKPsfvriIg,controller_id=0,topic_metadata=[{topic_error_code=0,topic=mdm.worker.request,is_internal=false,partition_metadata=[{partition_error_code=0,partition_id=0,leader=0,replicas=[0],isr=[0]}]}]} 2018-01-11 18:56:24,621 DEBUG oakcMetadata - Updated cluster metadata version 2 to Cluster(id = LwSLmJpTQf6tSKPsfvriIg, nodes = [kafka-0.kafka-hs.default.svc.cluster.local:9093 (id: 0 rack: null)], partitions = [Partition(topic = mdm.worker.request, partition = 0, leader = 0, replicas = [0], isr = [0])]) 2018-01-11 18:56:24,622 TRACE oakcNetworkClient - Completed receive from node -1, for key 10, received {error_code=0,coordinator={node_id=0,host=kafka-0.kafka-hs.default.svc.cluster.local,port=9093}} 2018-01-11 18:56:24,624 DEBUG oakcciAbstractCoordinator - Received GroupCoordinator response ClientResponse(receivedTimeMs=1515678984622, latencyMs=798, disconnected=false, requestHeader={api_key=10,api_version=0,correlation_id=0,client_id=consumer-1}, responseBody=FindCoordinatorResponse(throttleTimeMs=0, errorMessage='null', error=NONE, node=kafka-0.kafka-hs.default.svc.cluster.local:9093 (id: 0 rack: null))) for group workerListener 2018-01-11 18:56:24,625 INFO oakcciAbstractCoordinator - Discovered coordinator kafka-0.kafka-hs.default.svc.cluster.local:9093 (id: 2147483647 rack: null) for group workerListener. 2018-01-11 18:56:24,625 DEBUG oakcNetworkClient - Initiating connection to node 2147483647 at kafka-0.kafka-hs.default.svc.cluster.local:9093. 2018-01-11 18:56:24,633 DEBUG oakcNetworkClient - Error connecting to node 2147483647 at kafka-0.kafka-hs.default.svc.cluster.local:9093: java.io.IOException: Can't resolve address: kafka-0.kafka-hs.default.svc.cluster.local:9093 at org.apache.kafka.common.network.Selector.connect(Selector.java:195) at org.apache.kafka.clients.NetworkClient.initiateConnect(NetworkClient.java:762) at org.apache.kafka.clients.NetworkClient.ready(NetworkClient.java:224) at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.tryConnect(ConsumerNetworkClient.java:462) at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$GroupCoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:598) at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$GroupCoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:579) at org.apache.kafka.clients.consumer.internals.RequestFuture$1.onSuccess(RequestFuture.java:204) at org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:167) at org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:127) at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.fireCompletion(ConsumerNetworkClient.java:488) at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.firePendingCompletedRequests(ConsumerNetworkClient.java:348) at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:262) at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:208) at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:184) at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureCoordinatorReady(AbstractCoordinator.java:214) at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureCoordinatorReady(AbstractCoordinator.java:200) at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:286) at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1078) at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1043) at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:614) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.lang.Thread.run(Thread.java:748) Caused by: java.nio.channels.UnresolvedAddressException: null at sun.nio.ch.Net.checkAddress(Net.java:101) at sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:622) at org.apache.kafka.common.network.Selector.connect(Selector.java:192) ... 22 common frames omitted 2018-01-11 18:56:24,634 INFO oakcciAbstractCoordinator - Marking the coordinator kafka-0.kafka-hs.default.svc.cluster.local:9093 (id: 2147483647 rack: null) dead for group workerListener 2018-01-11 18:56:24,735 TRACE oakcNetworkClient - Found least loaded node kafka-0.kafka-hs.default.svc.cluster.local:9093 (id: 0 rack: null) 2018-01-11 18:56:24,735 DEBUG oakcciAbstractCoordinator - Sending GroupCoordinator request for group workerListener to broker kafka-0.kafka-hs.default.svc.cluster.local:9093 (id: 0 rack: null) 2018-01-11 18:56:24,735 DEBUG oakcNetworkClient - Initiating connection to node 0 at kafka-0.kafka-hs.default.svc.cluster.local:9093. 2018-01-11 18:56:24,736 DEBUG oakcNetworkClient - Error connecting to node 0 at kafka-0.kafka-hs.default.svc.cluster.local:9093: java.io.IOException: Can't resolve address: kafka-0.kafka-hs.default.svc.cluster.local:9093 at org.apache.kafka.common.network.Selector.connect(Selector.java:195) at org.apache.kafka.clients.NetworkClient.initiateConnect(NetworkClient.java:762) at org.apache.kafka.clients.NetworkClient.ready(NetworkClient.java:224) at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.trySend(ConsumerNetworkClient.java:408) at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:223) at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:208) at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:184) at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureCoordinatorReady(AbstractCoordinator.java:214) at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureCoordinatorReady(AbstractCoordinator.java:200) at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:286) at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1078) at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1043) at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:614) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.lang.Thread.run(Thread.java:748) Caused by: java.nio.channels.UnresolvedAddressException: null at sun.nio.ch.Net.checkAddress(Net.java:101) at sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:622) at org.apache.kafka.common.network.Selector.connect(Selector.java:192) ... 15 common frames omitted 2018-01-11 18:56:24,737 TRACE oakcNetworkClient - Removing node kafka-0.kafka-hs.default.svc.cluster.local:9093 (id: 0 rack: null) from least loaded node selection: is-blacked-out: true, in-flight-requests: 0 2018-01-11 18:56:24,737 TRACE oakcNetworkClient - Least loaded node selection failed to find an available node 2018-01-11 18:56:24,738 DEBUG oakcNetworkClient - Give up sending metadata request since no node is available 

我和你上周遇到了同样的问题并解决了它,因此可以将Kafka暴露在Kubernetes之外!

解决方案:在您的Kafka broker-config.yaml您应该将群集外部IP映射到本地DNS

 kafka-I.kafka-hs.default.svc.cluster.local:9093 

如何:

将它们添加到server.properties文件中:

 listener.security.protocol.map=INTERNAL_PLAINTEXT:PLAINTEXT,EXTERNAL_PLAINTEXT:PLAINTEXT inter.broker.listener.name=INTERNAL_PLAINTEXT 

如果你有一个在server.properties之前运行的init ,你应该添加:

 # add unique label to each pod kubectl label pods ${HOSTNAME} kafka-set-component=${HOSTNAME} EXTERNAL_LISTENER_IP= EXTERNAL_LISTENER_PORT=$((30093 + ${HOSTNAME##*-})) sed -i "s/#listeners=PLAINTEXT:\/\/:9092/listeners=INTERNAL_PLAINTEXT:\/\/0.0.0.0:9092,EXTERNAL_PLAINTEXT:\/\/0.0.0.0:9093/" /etc/kafka/server.properties sed -i "s/#advertised.listeners=PLAINTEXT:\/\/your.host.name:9092/advertised.listeners=INTERNAL_PLAINTEXT:\/\/$HOSTNAME.broker.kafka.svc.cluster.local:9092,EXTERNAL_PLAINTEXT:\/\/$EXTERNAL_LISTENER_IP:$EXTERNAL_LISTENER_PORT/" /etc/kafka/server.properties 

否则,您应该找到一种在运行时在server.properties中添加替换配置的方法。

请注意 ,您必须在server.properties文件中注释这些行

 #listeners=PLAINTEXT://:9092 #advertised.listeners=PLAINTEXT://your.host.name:9092 

服务:创建无头服务,为您拥有的每个经纪人映射本地DNS和服务:

 # A headless service to create DNS records --- apiVersion: v1 kind: Service metadata: name: broker namespace: kafka spec: ports: - port: 9092 # [podname].broker.kafka.svc.cluster.local clusterIP: None selector: app: kafka --- apiVersion: v1 kind: Service metadata: name: broker-0 namespace: kafka spec: type: NodePort ports: - port: 9093 nodePort: 30093 selector: kafka-set-component: kafka-0 --- apiVersion: v1 kind: Service metadata: name: broker-1 namespace: kafka spec: type: NodePort ports: - port: 9093 nodePort: 30094 selector: kafka-set-component: kafka-1 --- apiVersion: v1 kind: Service metadata: name: broker-2 namespace: kafka spec: type: NodePort ports: - port: 9093 nodePort: 30095 selector: kafka-set-component: kafka-2 

注意: – 如果您在GKE上运行:

  1. 可以通过gcloud compute instances list找到在server.properties init中声明的gcloud compute instances list
  2. 此外,你必须授予防火墙gcloud compute firewall-rules create kafka-external --allow tcp:30093,tcp:30094,tcp:30095权限gcloud compute firewall-rules create kafka-external --allow tcp:30093,tcp:30094,tcp:30095