Collect all kinds of apache-kafka programming exceptions and fault solutions.


How to create a new consumer group in kafka

I am running kafka locally following instructions on quick start guide here,
and then I defined my consumer group configuration in config/consumer.properties so that my consumer can pick messages...


object FlinkKafkaConsumer010 is not a member of package org.apache.flink.streaming.connectors.kafka

I am trying to assembly a little program to connect to kafka topic using apache flink. I need to use FlinkKafkaConsumer010.
package uimp
import org.apache.flink.streaming.api.scala._
import org.apa...


How to Construct Nested JSON Message on Output Topic in KSQLDB

from one of the source systems i received the below event payload
Created Stream1 for the below json payload
Event JSON 1
{
"event": {
"header": {
"name":"abc&


Error starting Kafka connect in distributed mode

I am starting Kafka connect in distribute mode but getting the following error.
This works fine on my other dev cluster, however, I only get this error in my production cluster.
ERROR Stopping due to


Is it possible to drop an aggregate (Kafka streams) if a specific number of bytes is exceeded?

My goal is to set an aggregate to null when a certain number of bytes (serialized json) is exceeded. Is there some way or is the retention time the only way to eventually delete aggregates?


Is pre-authentication against a second url supported in kafka-connect-http?

I want to use ready made kafka connector for fetching the data from the REST API. I found kafka-connect-http connector on the confluent hub but this connector does not support pre-authentication of...


Quarkus BackPressure configuration

I got the following stacktrace using quarkus smallrye reactive messaging with kafka :
2020-07-24 01:38:31,662 ERROR [io.sma.rea.mes.kafka] (executor-thread-870) SRMSG18207: Unable to dispatch messa...


EOS in REST Proxy

How to implement exactly once if data is being injected into Kafka cluster using REST PROXY.
I am using the confluent platform version 5.3 community edition.


Quarkus BackPression management

I got the following stacktrace using quarkus reactive messaging with kafka :
at org.jboss.threads.ContextClassLoaderSavingRunnable.run(ContextClassLoaderSavingRunnable.java:35)
at org.


org.apache.kafka.streams.errors.StreamsException Error

After adding retention.ms = "604,800,000" I am getting below error on Kafka.
org.apache.kafka.streams.errors.StreamsException: task [0_7] Abort sending since an error caught with a previous


Does Kafka consumer fetch-min-size (fetch.min.bytes) wait for the mentioned size to get filled?

Suppose there are 107 records, each record is 1kb. If the fetch-size is 15kb, in 7 iterations 105kb would be consumed. Now, only 2kb is remaining, will I get the remaining 2 records in next iterati...


Kafka: How to Display Offsets

I'm super new to Kafka. I've installed kafka and zookeeper using homebrew on my mac, and I'm playing around with the quickstart guide.
I've been able to push messages onto Kafka using the following


Kafka MirrorMaker2 - not mirroring consumer group offsets

I have setup MirrorMaker2 for replicating data between 2 DC's.
My mm2.properties,
# mm2.properties
name=source->dest
clusters=source, dest
source.bootstrap.servers=localhost:9091
dest.bootst...


How to create Kafka consumer group using command line?

I'm using community version of Confluent 5.2. I created a new topic with name employee-info.
I'm using flink as a consumer for above topic. In flink, for adding kafka source we need to pass in grou...


Kafka Connect - Cannot ALTER to add missing field SinkRecordField{schema=Schema{BYTES}, name='CreateUID', isPrimaryKey=true},

I am using JDBC source connector to read data from a Teradata table and push to Kafka topic . But when I am trying to use JDBC sink connector to read Kafka topic and push to Oracle table it throws ...


Kafka-connect topic.prefix without the table name

I'm using jdbc source connector, my table names have special chars (ie.$) that are acceptable to the DB engine but when I run kafka-connect with below configuration, it attempts to create the kafka...


Confluent Kafka Connect : Run multiple sink connectors in synchronous way

We are using Kafka connect S3 sink connector that connect to Kafka and load data to S3 buckets.Now I want to load data from S3 buckets to AWS Redshift using Copy command, for that I'm creating my own


Partition By Multiple Nested Fields in Kafka Connect HDFS Sink

We are running kafka hdfs sink connector(version 5.2.1) and needs HDFS data to be partitioned by multiple nested fields.The data in topics is stored as Avro and has nested elements.How ever connect


JDBC Sink Connector -upserting into multiple tables from multiples topics using kafka-connect - Follow up

This is related to the topic mentioned in the below thread
JDBC Sink Connector -upserting into multiple tables from multiples topics using kafka-connect
I know its a bit older post. But my question...


spring-cloud-stream and kafka-clients backward compatibility

We have a micro services that currently uses spring-cloud-stream Ditmars.RELEASE that in turn uses kafka-clients 0.10.1.1.
We are interested in upgrading to spring-cloud-stream 2.0.0.RC3 that in t...


Data Modeling with Kafka? Topics and Partitions

One of the first things I think about when using a new service (such as a non-RDBMS data store or a message queue) is: "How should I structure my data?".
I've read and watched some introductory


How to configure multiple kafka consumer in application.yml file

Actually i have a springboot based micro-service , and i have used kafka to produce/consume data from different system.
Now my question is i have two different topics and based on topics i have two


Spring-Kafka: Rebalancing happening while using consumer pause/resume which should not as per documentation

Spring-Kafka: While pausing/resuming consumer using pause/resume method as per documentation, rebalance should not occur when automatic assignment is used but it is not working, rebalancing happeni...


Kafka Consumer Rebalancing Algorithm

Can someone please tell me what the rebalancing algorithm is for Kafka consumers? I would like to understand how partition count and consumer threads affect this.
Thank you,


Where do i define topic.metadata.refresh.interval.ms?

I'm testing kafka a little, and hopefully am going to put it in my production stack soon.
I'm using the files kafka-console-producer.sh and kafka-console-consumer.sh to test kafka's functionality....


KAFKA: Connection to node failed authentication due to: Authentication failed due to invalid credentials with SASL mechanism SCRAM-SHA-256

I've been trying to add a SASL Authentication to my Kafka Brokers using SASL_PLAINTEXT SCRAM-SHA-256 for a while, but without any success. I keep getting the following error on Kafka's logfile.
...


Kafka Producers/Consumers over WAN?

I have a Kafka Cluster in a data center. A bunch of clients that may communicate across WANs (even the internet) will send/receive real time messages to/from the cluster.
I read from Kafka's


Kafka - consumers / producers works with all Zookeper instances down

I've configured a cluster of Kafka brokers and a cluster of Zk instances using kafka_2.11-1.1.0 distribution archive.
For Kafka brokers I've configured config/server.properties
broker.id=1,2,3


request.timeout.ms and Spring Kafka Synchronous Event Publishing with KafkaTemplate

I'm a bit confused about the best practice for configuring the timeout of an event published synchronously through Spring Kafka.The Spring Kafka documentation provides an example using ListenableFu...


Kafka Streams: Any guarantees on ordering of saves to state stores when using at_least_once?

We have a Kafka Streams Java topology built with the Processor API.
In the topology, we have a single processor, that saves to multiple state stores.
As we use at_least_once, we would expect to see


Is there any way to forward Kafka messages from topic on one server to topic on another server?

I have a scenario where we are forwarding our application logs to Kafka topic using fluentD agents,
as Kafka team introduced Kerberos authentication and fluentD version not supporting this authenti...


Kafka Stream Exception: GroupAuthorizationException

I'm developing a Kafka-Stream application, which will read the message from input Kafka topic and filter unwanted data and push to output Kafka topic.
Kafka Stream Configuration:
@Bean(name =


Is it possible to merge incomplete events into a KTable?

I wonder if the KTable would meet our needs.
Let's say I have a topic myTopic in Kafka that contains events, and I plug a KafkaStreams App on this topic.
Let's assume at time t0, the myTopic cont...