Error reading field ‘topic_metadata’: Error reading array of size 1139567, only 45 bytes available

apache-kafkakafka-consumer-api

–Consumer

Properties props = new Properties();
        String groupId = "consumer-tutorial-group";
        List<String> topics = Arrays.asList("consumer-tutorial");
        props.put("bootstrap.servers", "192.168.1.75:9092");
        props.put("group.id", groupId);
        props.put("enable.auto.commit", "true");
        props.put("key.deserializer", StringDeserializer.class.getName());
        props.put("value.deserializer", StringDeserializer.class.getName());
        KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props);
        try {
            consumer.subscribe(topics);
            while (true) {

                ConsumerRecords<String, String> records = consumer.poll(Long.MAX_VALUE);
                for (ConsumerRecord<String, String> record : records)
                    System.out.printf("offset = %d, key = %s, value = %s", record.offset(), record.key(), record.value());


            }
        } catch (Exception e) {
            System.out.println(e.toString());
        } finally {
            consumer.close();
        }
    }

i am trying to write run the above code,its a simple consumer code which try to read from a topic but i got a weird exception and i can't handle it.

org.apache.kafka.common.protocol.types.SchemaException: Error reading field 'topic_metadata': Error reading array of size 1139567, only 45 bytes available

i quote you also my producer code

–Producer

Properties props = new Properties();
        props.put("bootstrap.servers", "192.168.1.7:9092");
        props.put("acks", "all");
        props.put("retries", 0);
        props.put("batch.size", 16384);
        props.put("linger.ms", 1);
        props.put("buffer.memory", 33554432);
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

        Producer<String, String> producer = new KafkaProducer<String, String>(props);
        for(int i = 0; i < 100; i++)
            producer.send(new ProducerRecord<String, String>("consumer-tutorial", Integer.toString(i), Integer.toString(i)));

        producer.close();

Here is kafka configs

–Start zookeeper

bin/zookeeper-server-start.sh config/zookeeper.properties

–Start Kafka Server

bin/kafka-server-start.sh config/server.properties

— Create a topic

bin/kafka-topics.sh –create –topic consumer-tutorial –replication-factor 1 –partitions 3 –zookeeper 192.168.1.75:2181

–Kafka 0.10.0

<dependency>
           <groupId>org.apache.kafka</groupId>
           <artifactId>kafka-clients</artifactId>
           <version>0.10.0.0</version>
   </dependency>
   <dependency>
           <groupId>org.apache.kafka</groupId>
           <artifactId>kafka_2.11</artifactId>
           <version>0.10.0.0</version>
   </dependency>

Best Answer

I've also got the same issue when using kafka_2.11 artifact with version 0.10.0.0. But this got resolved once I've changed the kafka server to 0.10.0.0. Earlier I was pointing to 0.9.0.1. It looks like server and your pom version should be in synch.

Related Topic