Confluent's Kafka Python Client

Overview

Confluent's Python Client for Apache KafkaTM

confluent-kafka-python provides a high-level Producer, Consumer and AdminClient compatible with all Apache KafkaTM brokers >= v0.8, Confluent Cloud and the Confluent Platform. The client is:

  • Reliable - It's a wrapper around librdkafka (provided automatically via binary wheels) which is widely deployed in a diverse set of production scenarios. It's tested using the same set of system tests as the Java client and more. It's supported by Confluent.

  • Performant - Performance is a key design consideration. Maximum throughput is on par with the Java client for larger message sizes (where the overhead of the Python interpreter has less impact). Latency is on par with the Java client.

  • Future proof - Confluent, founded by the creators of Kafka, is building a streaming platform with Apache Kafka at its core. It's high priority for us that client features keep pace with core Apache Kafka and components of the Confluent Platform.

See the API documentation for more info.

Usage

Below are some examples of typical usage. For more examples, see the examples directory or the confluentinc/examples github repo for a Confluent Cloud example.

Producer

from confluent_kafka import Producer


p = Producer({'bootstrap.servers': 'mybroker1,mybroker2'})

def delivery_report(err, msg):
    """ Called once for each message produced to indicate delivery result.
        Triggered by poll() or flush(). """
    if err is not None:
        print('Message delivery failed: {}'.format(err))
    else:
        print('Message delivered to {} [{}]'.format(msg.topic(), msg.partition()))

for data in some_data_source:
    # Trigger any available delivery report callbacks from previous produce() calls
    p.poll(0)

    # Asynchronously produce a message, the delivery report callback
    # will be triggered from poll() above, or flush() below, when the message has
    # been successfully delivered or failed permanently.
    p.produce('mytopic', data.encode('utf-8'), callback=delivery_report)

# Wait for any outstanding messages to be delivered and delivery report
# callbacks to be triggered.
p.flush()

High-level Consumer

from confluent_kafka import Consumer


c = Consumer({
    'bootstrap.servers': 'mybroker',
    'group.id': 'mygroup',
    'auto.offset.reset': 'earliest'
})

c.subscribe(['mytopic'])

while True:
    msg = c.poll(1.0)

    if msg is None:
        continue
    if msg.error():
        print("Consumer error: {}".format(msg.error()))
        continue

    print('Received message: {}'.format(msg.value().decode('utf-8')))

c.close()

AvroProducer

from confluent_kafka import avro
from confluent_kafka.avro import AvroProducer


value_schema_str = """
{
   "namespace": "my.test",
   "name": "value",
   "type": "record",
   "fields" : [
     {
       "name" : "name",
       "type" : "string"
     }
   ]
}
"""

key_schema_str = """
{
   "namespace": "my.test",
   "name": "key",
   "type": "record",
   "fields" : [
     {
       "name" : "name",
       "type" : "string"
     }
   ]
}
"""

value_schema = avro.loads(value_schema_str)
key_schema = avro.loads(key_schema_str)
value = {"name": "Value"}
key = {"name": "Key"}


def delivery_report(err, msg):
    """ Called once for each message produced to indicate delivery result.
        Triggered by poll() or flush(). """
    if err is not None:
        print('Message delivery failed: {}'.format(err))
    else:
        print('Message delivered to {} [{}]'.format(msg.topic(), msg.partition()))


avroProducer = AvroProducer({
    'bootstrap.servers': 'mybroker,mybroker2',
    'on_delivery': delivery_report,
    'schema.registry.url': 'http://schema_registry_host:port'
    }, default_key_schema=key_schema, default_value_schema=value_schema)

avroProducer.produce(topic='my_topic', value=value, key=key)
avroProducer.flush()

AvroConsumer

from confluent_kafka.avro import AvroConsumer
from confluent_kafka.avro.serializer import SerializerError


c = AvroConsumer({
    'bootstrap.servers': 'mybroker,mybroker2',
    'group.id': 'groupid',
    'schema.registry.url': 'http://127.0.0.1:8081'})

c.subscribe(['my_topic'])

while True:
    try:
        msg = c.poll(10)

    except SerializerError as e:
        print("Message deserialization failed for {}: {}".format(msg, e))
        break

    if msg is None:
        continue

    if msg.error():
        print("AvroConsumer error: {}".format(msg.error()))
        continue

    print(msg.value())

c.close()

AdminClient

Create topics:

from confluent_kafka.admin import AdminClient, NewTopic

a = AdminClient({'bootstrap.servers': 'mybroker'})

new_topics = [NewTopic(topic, num_partitions=3, replication_factor=1) for topic in ["topic1", "topic2"]]
# Note: In a multi-cluster production scenario, it is more typical to use a replication_factor of 3 for durability.

# Call create_topics to asynchronously create topics. A dict
# of <topic,future> is returned.
fs = a.create_topics(new_topics)

# Wait for each operation to finish.
for topic, f in fs.items():
    try:
        f.result()  # The result itself is None
        print("Topic {} created".format(topic))
    except Exception as e:
        print("Failed to create topic {}: {}".format(topic, e))

Thread Safety

The Producer, Consumer and AdminClient are all thread safe.

Install

Install self-contained binary wheels

$ pip install confluent-kafka

NOTE: The pre-built Linux wheels do NOT contain SASL Kerberos/GSSAPI support. If you need SASL Kerberos/GSSAPI support you must install librdkafka and its dependencies using the repositories below and then build confluent-kafka using the command in the "Install from source from PyPi" section below.

Install AvroProducer and AvroConsumer

$ pip install "confluent-kafka[avro]"

Install from source from PyPi (requires librdkafka + dependencies to be installed separately):

$ pip install --no-binary :all: confluent-kafka

For source install, see Prerequisites below.

Broker Compatibility

The Python client (as well as the underlying C library librdkafka) supports all broker versions >= 0.8. But due to the nature of the Kafka protocol in broker versions 0.8 and 0.9 it is not safe for a client to assume what protocol version is actually supported by the broker, thus you will need to hint the Python client what protocol version it may use. This is done through two configuration settings:

  • broker.version.fallback=YOUR_BROKER_VERSION (default 0.9.0.1)
  • api.version.request=true|false (default true)

When using a Kafka 0.10 broker or later you don't need to do anything (api.version.request=true is the default). If you use Kafka broker 0.9 or 0.8 you must set api.version.request=false and set broker.version.fallback to your broker version, e.g broker.version.fallback=0.9.0.1.

More info here: https://github.com/edenhill/librdkafka/wiki/Broker-version-compatibility

SSL certificates

If you're connecting to a Kafka cluster through SSL you will need to configure the client with 'security.protocol': 'SSL' (or 'SASL_SSL' if SASL authentication is used).

The client will use CA certificates to verify the broker's certificate. The embedded OpenSSL library will look for CA certificates in /usr/lib/ssl/certs/ or /usr/lib/ssl/cacert.pem. CA certificates are typically provided by the Linux distribution's ca-certificates package which needs to be installed through apt, yum, et.al.

If your system stores CA certificates in another location you will need to configure the client with 'ssl.ca.location': '/path/to/cacert.pem'.

Alternatively, the CA certificates can be provided by the certifi Python package. To use certifi, add an import certifi line and configure the client's CA location with 'ssl.ca.location': certifi.where().

Prerequisites

  • Python >= 2.7 or Python 3.x
  • librdkafka >= 1.4.0 (latest release is embedded in wheels)

librdkafka is embedded in the macosx manylinux wheels, for other platforms, SASL Kerberos/GSSAPI support or when a specific version of librdkafka is desired, following these guidelines:

License

Apache License v2.0

KAFKA is a registered trademark of The Apache Software Foundation and has been licensed for use by confluent-kafka-python. confluent-kafka-python has no affiliation with and is not endorsed by The Apache Software Foundation.

Developer Notes

Instructions on building and testing confluent-kafka-python can be found here.

Comments
  • How to Integrate Keytab/truststore/jass.conf/krb5.conf

    How to Integrate Keytab/truststore/jass.conf/krb5.conf

    Is there a way to integrate the usage of these files along with the producer code using python? (In java it has been done using system.properties)

    Files: krb5.conf kafka.client.jaas.conf truststore.jks keytab file

    question 
    opened by bgayathri 56
  • Consumer Keeps Resets to -1001 and difference between topic offset and consumer offset?

    Consumer Keeps Resets to -1001 and difference between topic offset and consumer offset?

    Description

    So I'm currently troubleshooting my consumer and trying different things with partitions. I have 3 partitions and a consumer for each partition. I initially had an issue where the commit offset would be -1001 but I figured that was only because of the timeout. So I put in code to reset it to 0 if <0, but now everytime I rerun the consumer it always returns -1001 as my offset. Is there a way to find the latest commit of a particular topic partition? And also, what is the difference between topic and consumer offset?

    Thanks in advance

    How to reproduce

    consumer = Consumer({'bootstrap.servers': KAFKA_BROKER,
    ... 'group.id': 'maxwellConsumer','debug':'broker,fetch',
    ... 'enable.auto.commit': True,
    ... 'default.topic.config': {'auto.offset.reset': 'latest'}}) %7|1513895424.653|BRKMAIN|rdkafka#consumer-2| [thrd::0/internal]: :0/internal: Enter main broker thread %7|1513895424.653|STATE|rdkafka#consumer-2| [thrd::0/internal]: :0/internal: Broker changed state INIT -> UP %7|1513895424.654|BROKER|rdkafka#consumer-2| [thrd:app]: 172.31.230.234:9092/bootstrap: Added new broker with NodeId -1 %7|1513895424.654|BRKMAIN|rdkafka#consumer-2| [thrd:172.31.230.234:9092/bootstrap]: 172.31.230.234:9092/bootstrap: Enter main broker thread %7|1513895424.654|CONNECT|rdkafka#consumer-2| [thrd:172.31.230.234:9092/bootstrap]: 172.31.230.234:9092/bootstrap: broker in state INIT connecting %7|1513895424.654|CONNECT|rdkafka#consumer-2| [thrd:172.31.230.234:9092/bootstrap]: 172.31.230.234:9092/bootstrap: Connecting to ipv4#172.31.230.234:9092 (plaintext) with socket 7 %7|1513895424.654|STATE|rdkafka#consumer-2| [thrd:172.31.230.234:9092/bootstrap]: 172.31.230.234:9092/bootstrap: Broker changed state INIT -> CONNECT %7|1513895424.654|CONNECT|rdkafka#consumer-2| [thrd:172.31.230.234:9092/bootstrap]: 172.31.230.234:9092/bootstrap: Connected to ipv4#172.31.230.234:9092 %7|1513895424.654|CONNECTED|rdkafka#consumer-2| [thrd:172.31.230.234:9092/bootstrap]: 172.31.230.234:9092/bootstrap: Connected (#1) %7|1513895424.654|FEATURE|rdkafka#consumer-2| [thrd:172.31.230.234:9092/bootstrap]: 172.31.230.234:9092/bootstrap: Updated enabled protocol features +ApiVersion to ApiVersion %7|1513895424.654|STATE|rdkafka#consumer-2| [thrd:172.31.230.234:9092/bootstrap]: 172.31.230.234:9092/bootstrap: Broker changed state CONNECT -> APIVERSION_QUERY %7|1513895424.655|FEATURE|rdkafka#consumer-2| [thrd:172.31.230.234:9092/bootstrap]: 172.31.230.234:9092/bootstrap: Updated enabled protocol features to MsgVer1,ApiVersion,BrokerBalancedConsumer,ThrottleTime,Sasl,SaslHandshake,BrokerGroupCoordinator,LZ4,OffsetTime,MsgVer2 %7|1513895424.655|STATE|rdkafka#consumer-2| [thrd:172.31.230.234:9092/bootstrap]: 172.31.230.234:9092/bootstrap: Broker changed state APIVERSION_QUERY -> UP %7|1513895424.655|BROKER|rdkafka#consumer-2| [thrd:main]: 192.23.213.130:9092/2: Added new broker with NodeId 2 %7|1513895424.655|BRKMAIN|rdkafka#consumer-2| [thrd:192.23.213.130:9092/2]: 192.23.213.130:9092/2: Enter main broker thread %7|1513895424.655|CONNECT|rdkafka#consumer-2| [thrd:192.23.213.130:9092/2]: 192.23.213.130:9092/2: broker in state INIT connecting %7|1513895424.655|BROKER|rdkafka#consumer-2| [thrd:main]: 172.31.230.155:9092/1: Added new broker with NodeId 1 %7|1513895424.655|CONNECT|rdkafka#consumer-2| [thrd:192.23.213.130:9092/2]: 192.23.213.130:9092/2: Connecting to ipv4#192.23.213.130:9092 (plaintext) with socket 10 %7|1513895424.655|CLUSTERID|rdkafka#consumer-2| [thrd:main]: 172.31.230.234:9092/bootstrap: ClusterId update "" -> "LlhmfovJSe-sOmvvhbrI7w" %7|1513895424.655|UPDATE|rdkafka#consumer-2| [thrd:172.31.230.234:9092/bootstrap]: 172.31.230.234:9092/bootstrap: NodeId changed from -1 to 0 %7|1513895424.655|UPDATE|rdkafka#consumer-2| [thrd:172.31.230.234:9092/bootstrap]: 172.31.230.234:9092/0: Name changed from 172.31.230.234:9092/bootstrap to 172.31.230.234:9092/0 %7|1513895424.655|LEADER|rdkafka#consumer-2| [thrd:172.31.230.234:9092/bootstrap]: 172.31.230.234:9092/0: Mapped 0 partition(s) to broker %7|1513895424.655|STATE|rdkafka#consumer-2| [thrd:172.31.230.234:9092/bootstrap]: 172.31.230.234:9092/0: Broker changed state UP -> UPDATE %7|1513895424.655|STATE|rdkafka#consumer-2| [thrd:192.23.213.130:9092/2]: 192.23.213.130:9092/2: Broker changed state INIT -> CONNECT %7|1513895424.655|BRKMAIN|rdkafka#consumer-2| [thrd:172.31.230.155:9092/1]: 172.31.230.155:9092/1: Enter main broker thread %7|1513895424.655|CONNECT|rdkafka#consumer-2| [thrd:172.31.230.155:9092/1]: 172.31.230.155:9092/1: broker in state INIT connecting %7|1513895424.655|CONNECT|rdkafka#consumer-2| [thrd:172.31.230.155:9092/1]: 172.31.230.155:9092/1: Connecting to ipv4#172.31.230.155:9092 (plaintext) with socket 13 %7|1513895424.656|STATE|rdkafka#consumer-2| [thrd:172.31.230.155:9092/1]: 172.31.230.155:9092/1: Broker changed state INIT -> CONNECT %7|1513895424.656|STATE|rdkafka#consumer-2| [thrd:172.31.230.234:9092/bootstrap]: 172.31.230.234:9092/0: Broker changed state UPDATE -> UP %7|1513895424.656|CONNECT|rdkafka#consumer-2| [thrd:192.23.213.130:9092/2]: 192.23.213.130:9092/2: Connected to ipv4#192.23.213.130:9092 %7|1513895424.656|CONNECTED|rdkafka#consumer-2| [thrd:192.23.213.130:9092/2]: 192.23.213.130:9092/2: Connected (#1) %7|1513895424.656|FEATURE|rdkafka#consumer-2| [thrd:192.23.213.130:9092/2]: 192.23.213.130:9092/2: Updated enabled protocol features +ApiVersion to ApiVersion %7|1513895424.656|STATE|rdkafka#consumer-2| [thrd:192.23.213.130:9092/2]: 192.23.213.130:9092/2: Broker changed state CONNECT -> APIVERSION_QUERY %7|1513895424.656|CONNECT|rdkafka#consumer-2| [thrd:172.31.230.155:9092/1]: 172.31.230.155:9092/1: Connected to ipv4#172.31.230.155:9092 %7|1513895424.656|CONNECTED|rdkafka#consumer-2| [thrd:172.31.230.155:9092/1]: 172.31.230.155:9092/1: Connected (#1) %7|1513895424.656|FEATURE|rdkafka#consumer-2| [thrd:172.31.230.155:9092/1]: 172.31.230.155:9092/1: Updated enabled protocol features +ApiVersion to ApiVersion %7|1513895424.656|STATE|rdkafka#consumer-2| [thrd:172.31.230.155:9092/1]: 172.31.230.155:9092/1: Broker changed state CONNECT -> APIVERSION_QUERY %7|1513895424.656|FEATURE|rdkafka#consumer-2| [thrd:192.23.213.130:9092/2]: 192.23.213.130:9092/2: Updated enabled protocol features to MsgVer1,ApiVersion,BrokerBalancedConsumer,ThrottleTime,Sasl,SaslHandshake,BrokerGroupCoordinator,LZ4,OffsetTime,MsgVer2 %7|1513895424.656|STATE|rdkafka#consumer-2| [thrd:192.23.213.130:9092/2]: 192.23.213.130:9092/2: Broker changed state APIVERSION_QUERY -> UP %7|1513895424.656|FEATURE|rdkafka#consumer-2| [thrd:172.31.230.155:9092/1]: 172.31.230.155:9092/1: Updated enabled protocol features to MsgVer1,ApiVersion,BrokerBalancedConsumer,ThrottleTime,Sasl,SaslHandshake,BrokerGroupCoordinator,LZ4,OffsetTime,MsgVer2 %7|1513895424.656|STATE|rdkafka#consumer-2| [thrd:172.31.230.155:9092/1]: 172.31.230.155:9092/1: Broker changed state APIVERSION_QUERY -> UP

    topic = TopicPartition('elastic',50) print "Topic offset",topic.offset Topic offset -1001 consumer.assign([topic]) %7|1513895439.655|TOPBRK|rdkafka#consumer-2| [thrd:172.31.230.234:9092/bootstrap]: 172.31.230.234:9092/0: Topic elastic [2]: joining broker (rktp 0x7f9ffc004e70) %7|1513895439.655|TOPBRK|rdkafka#consumer-2| [thrd:172.31.230.155:9092/1]: 172.31.230.155:9092/1: Topic elastic [0]: joining broker (rktp 0x7f9ffc0045f0) %7|1513895439.655|TOPBRK|rdkafka#consumer-2| [thrd:192.23.213.130:9092/2]: 192.23.213.130:9092/2: Topic elastic [1]: joining broker (rktp 0x7f9ffc0049d0)

    confluent_kafka.version() - ('0.11.0', 720896) confluent_kafka.libversion() - ('0.11.0', 721151) Broker version 0.9.0.1 OS: ubuntu

    Checklist

    Please provide the following information:

    • [x] confluent-kafka-python and librdkafka version (confluent_kafka.version() and confluent_kafka.libversion()):
    • [x] Apache Kafka broker version:
    • [x] Client configuration: {...}
    • [x] Operating system:
    • [x] Provide client logs (with 'debug': '..' as necessary)
    • [x] Provide broker log excerpts
    • [ ] Critical issue
    wait info 
    opened by rohitgcs 32
  • Support win32

    Support win32

    Now that librdkafka 0.9.4 RC1 support compiling to win32 very easily. And a .NET version kafka SDK is out there. It would be nice if confluent-kafka-python adds support compiling on windows.

    I've compiled librdkafka 0.9.4 RC1 locally. I'm now stuck at pip install confluent-kafka-python-0.9.4-RC1 source with it. Python 3.6 + Visual Studio 2015 solve missing inttypes.h problems. And I'm stuck at link phrase. The python library requires static library file (rdkafka.lib), if I change librdkafka project to produce lib file. Then the pip install stops at the following error:

    Creating library build\temp.win-amd64-3.6\Release\confluent_kafka/src\cimpl.cp36-win_amd64.lib and object build\temp.win-amd64-3.6\Release\confluent_kafka/src\cimpl.cp36-win_amd64.exp
    confluent_kafka.obj : error LNK2001: unresolved external symbol __imp_rd_kafka_topic_conf_set_opaque
    confluent_kafka.obj : error LNK2001: unresolved external symbol __imp_rd_kafka_topic_conf_destroy
    confluent_kafka.obj : error LNK2001: unresolved external symbol __imp_rd_kafka_topic_partition_list_add
    confluent_kafka.obj : error LNK2001: unresolved external symbol __imp_rd_kafka_conf_new
    confluent_kafka.obj : error LNK2001: unresolved external symbol __imp_rd_kafka_msg_partitioner_consistent_random
    confluent_kafka.obj : error LNK2001: unresolved external symbol __imp_rd_kafka_conf_set_stats_cb
    confluent_kafka.obj : error LNK2001: unresolved external symbol __imp_rd_kafka_msg_partitioner_random
    confluent_kafka.obj : error LNK2001: unresolved external symbol __imp_rd_kafka_get_err_descs
    confluent_kafka.obj : error LNK2001: unresolved external symbol __imp_rd_kafka_topic_conf_set_partitioner_cb
    confluent_kafka.obj : error LNK2001: unresolved external symbol __imp_rd_kafka_msg_partitioner_consistent
    confluent_kafka.obj : error LNK2001: unresolved external symbol __imp_rd_kafka_topic_partition_list_destroy
    confluent_kafka.obj : error LNK2001: unresolved external symbol __imp_rd_kafka_version
    confluent_kafka.obj : error LNK2001: unresolved external symbol __imp_rd_kafka_version_str
    confluent_kafka.obj : error LNK2001: unresolved external symbol __imp_rd_kafka_conf_set_error_cb
    confluent_kafka.obj : error LNK2001: unresolved external symbol __imp_rd_kafka_topic_conf_new
    confluent_kafka.obj : error LNK2001: unresolved external symbol __imp_rd_kafka_conf_set_opaque
    confluent_kafka.obj : error LNK2001: unresolved external symbol __imp_rd_kafka_err2name
    confluent_kafka.obj : error LNK2001: unresolved external symbol __imp_rd_kafka_topic_conf_set
    confluent_kafka.obj : error LNK2001: unresolved external symbol __imp_rd_kafka_err2str
    confluent_kafka.obj : error LNK2001: unresolved external symbol __imp_rd_kafka_conf_set_default_topic_conf
    confluent_kafka.obj : error LNK2001: unresolved external symbol __imp_rd_kafka_message_timestamp
    confluent_kafka.obj : error LNK2001: unresolved external symbol strcasecmp
    confluent_kafka.obj : error LNK2001: unresolved external symbol __imp_rd_kafka_yield
    confluent_kafka.obj : error LNK2001: unresolved external symbol __imp_rd_kafka_topic_name
    confluent_kafka.obj : error LNK2001: unresolved external symbol __imp_rd_kafka_conf_set
    confluent_kafka.obj : error LNK2001: unresolved external symbol __imp_rd_kafka_topic_partition_list_new
    confluent_kafka.obj : error LNK2001: unresolved external symbol __imp_rd_kafka_conf_destroy
    Producer.obj : error LNK2001: unresolved external symbol __imp_rd_kafka_poll
    Producer.obj : error LNK2001: unresolved external symbol __imp_rd_kafka_destroy
    Producer.obj : error LNK2001: unresolved external symbol __imp_rd_kafka_last_error
    

    Enviornment: Windows 7 & Visual Studio Community 2015 python 3.6 anaconda 4.3.0 librdkafka 0.9.4 RC1 confluent-kafka-python-0.9.4-RC1 / confluent-kafka-python-0.9.2(the same error)

    enhancement installation FAQ 
    opened by wolvever 30
  • Is producer.flush() a must?

    Is producer.flush() a must?

    My multi-threaded producer doesn't seem to be sending any messages if flush is NOT included in the end. This is my script:

    conf = {'bootstrap.servers': 'localhost:9092', 
                 'queue.buffering.max.messages': 1000000, 
                 'queue.buffering.max.ms' : 500, 
                 'batch.num.messages': 50, 
                 'default.topic.config': {'acks': 'all'}}
    
    producer = confluent_kafka.Producer(**conf)
    
            try:
                   
                    fh = open(os.path.join("/home/samples/samples", queue.get()), "r")
                    while True:
                        data = fh.read(10240)
                        if data == '':
                            fh.close()
                            break
                       try:
                            producer.produce(topic, value=data, callback=self.delivery_callback)
                            producer.poll(0)
                       except BufferError as e:
                            print "Buffer full"
                            producer.produce(topic, value=data, callback=self.delivery_callback)
                            producer.poll(0)
                      #print "Waiting for %d deliveries\n" % len(producer)
                      #producer.flush()
                except IOError as e:
                    print "IO error"
                except ValueError:
                    print "Conversion error"
                except:
                    print "unexpected error"
                    raise
                queue.task_done()
    

    Adding flush() increases the run time drastically. Is it a must? Is there any other way I can make sure all the messages have reached the topics?

    opened by smj19 26
  • Python 3.10 wheel build for confluent-kafka 1.7.0

    Python 3.10 wheel build for confluent-kafka 1.7.0

    Description

    Python 3.10 have been released but the latest confluent-kafka release 1.7.0 don't have a prebuilt wheel for it.

    It is easy to build it from source but it slows down some of the image build procedures quite a bit.

    How to reproduce

    Checklist

    Please provide the following information:

    • [x] confluent-kafka-python and librdkafka version (confluent_kafka.version() and confluent_kafka.libversion())
    • [x] Apache Kafka broker version:
    • [x] Client configuration: {...}: None
    • [x] Operating system: Linux
    • [x] Provide client logs (with 'debug': '..' as necessary): none
    • [x] Provide broker log excerpts: none
    • [ ] Critical issue: No
    installation 
    opened by Gyllsdorff 25
  • fastavro + confluent-kafka avro producer throws TypeError: unhashable type: 'list'

    fastavro + confluent-kafka avro producer throws TypeError: unhashable type: 'list'

    Description

    I have a simple confluent kafka avro producer as follows:

    import sys
    import uuid
    from datetime import datetime
    
    import fastavro as fastavro
    from confluent_kafka import avro
    from confluent_kafka.avro import AvroProducer
    
    
    def delivery_callback(err, msg):
        if err:
            sys.stderr.write("%% Message failed delivery: %s\n" % err)
        else:
            sys.stderr.write(
                "%% Message delivered to %s [%d] @ %d\n"
                % (msg.topic(), msg.partition(), msg.offset())
            )
    
    
    if __name__ == "__main__":
        bootstrap_servers = "127.0.0.1:39092"
        schema_registry_servers = "http://127.0.0.1:48081"
    
        schema_registry_conf = {"url": schema_registry_servers}
    
        key_schema_string = """
        {"type": "string"}
        """
        config = {
            "bootstrap.servers": bootstrap_servers,
            "on_delivery": delivery_callback,
            "schema.registry.url": schema_registry_servers,
        }
    
        key_schema = avro.loads(key_schema_string)
        value_schema = fastavro.schema.load_schema("data/Employee.avsc")
        print(value_schema)
    
        avro_producer = AvroProducer(
            config=config, default_key_schema=key_schema, default_value_schema=value_schema
        )
    
        avro_producer.produce(
            topic="employees",
            key=str(uuid.uuid4()),
            value=dict(
                id=1,
                name="Felix",
                role="Eng",
                company="Philips",
                eventDate=datetime.utcnow(),
                address=dict(street="Murrysville", zip=15668),
            ),
        )
    
        avro_producer.flush()
    

    throws following exception:

    > Traceback (most recent call last):
    >   File "/PycharmProjects/ConfluentAvroKafkaProducer/employee_producer.py", line 55, in <module>
    >     address=dict(street="Murrysville", zip=15668),
    >   File "/PycharmProjects/ConfluentAvroKafkaProducer/.venv/lib/python3.7/site-packages/confluent_kafka/avro/__init__.py", line 99, in produce
    >     value = self._serializer.encode_record_with_schema(topic, value_schema, value)
    >   File "/PycharmProjects/ConfluentAvroKafkaProducer/.venv/lib/python3.7/site-packages/confluent_kafka/avro/serializer/message_serializer.py", line 108, in encode_record_with_schema
    >     schema_id = self.registry_client.register(subject, schema)
    >   File "/PycharmProjects/ConfluentAvroKafkaProducer/.venv/lib/python3.7/site-packages/confluent_kafka/avro/cached_schema_registry_client.py", line 209, in register
    >     schema_id = schemas_to_id.get(avro_schema, None)
    > TypeError: unhashable type: 'list'
    

    Note: I am attaching the source code with Including pyproject.toml Please find source code: ConfluentAvroKafkaProducer.zip

    How to reproduce

    Just try to run employee_producer to publish employee data to employees topic. Note: you need to have kafka+ schema registry running.

    Checklist

    Please provide the following information:

    • [ ] confluent-kafka-python and librdkafka version (confluent_kafka.version() and confluent_kafka.libversion()):

    • [ ] Apache Kafka broker version: confluentinc/cp-enterprise-kafka:5.1.2

    • [ ] Client configuration: {...}

    • [ ] Operating system: MacOS

    • [ ] Provide client logs (with 'debug': '..' as necessary)

    • [ ] Provide broker log excerpts

    • [ ] Critical issue

    opened by FelixKJose 25
  • Kafka consumer performance issues inside a single process

    Kafka consumer performance issues inside a single process

    Description

    For our use case, we need to read messages from a Kafka topic in batches. Each batch is defined by a partition and an interval of starting and ending offsets as shown the script below. These batches are then executed in a thread pool or process pool to consume the messages between the starting offset and last offset of that interval.

    The script for generating batches and consuming the messages using ProcesPoolExecutor/ ThreadPoolExecutor:

    from concurrent.futures import ProcessPoolExecutor, ThreadPoolExecutor
    from time import time
    import random
    
    import confluent_kafka as ck
    
    def r(v):
            part_no, low, high = v
            consume_batch_size = 10000
            
            topic = 'topic-1'
            conf = {
                'bootstrap.servers': 'localhost:9092',
                'group.id': 'my-group'
            }
            
            c = ck.Consumer(conf)
            tp = ck.TopicPartition(topic, part_no, low)
            c.assign([tp])
            
            start_offset = low
            end_offset = low
    
            while high > low:
                curr_high = min(low + consume_batch_size, high)
                for m in c.consume(curr_high-low):
                    end_offset = max(end_offset, m.offset())
                    m.value()
                low = curr_high
                
            return "{}-{}".format(start_offset , end_offset)
    
    def main():
        topic = 'topic-1'
        batch_size = 100000
        consumers = {}
        
        # to define the initial order of batches
        shuffle = False
        interleave = False
        if interleave:
            shuffle = False
    
        conf = {
            'bootstrap.servers': 'localhost:9092',
            'group.id': 'my-group'
        }
    
        start = time()
        master_consumer = ck.Consumer(conf)
        parts_metadata = master_consumer.list_topics(topic).topics[topic].partitions
    
        total_offset = [0]
                    
        parts = []
        for part_no in range(len(parts_metadata)):
            low, high = master_consumer.get_watermark_offsets(ck.TopicPartition(topic, part_no))
            index = part_no
            while high > low:
                curr_high = min(low + batch_size, high)
                if interleave:
                    parts.insert(index, (part_no, low, curr_high))
                    index += (part_no+1)
                else:
                    parts.append((part_no, low, curr_high))
                low = curr_high
        print('Partition set count:', len(parts))
        
        if shuffle:
            random.shuffle(parts)
    
        futures = []
        with ThreadPoolExecutor(max_workers=len(parts_metadata)) as e:
            for p in parts:
                futures.append(e.submit(r, p))
        
        [f.result() for f in futures]        
                
        took = time() - start
        count = sum(x[2]-x[1] for x in parts)
        rate = (count / 1024) / took
        print('read:', count, 'msgs', 1024 * count, 'b', 'in:', took, 's', 'rate:', rate, 'mb/s')
    
    if __name__ == '__main__':
        main()
    

    I need to vary consume_batch_size(Number of messages to be consumed using consumer.consume method) from the above script to get maximum throughput in each case. Below are maximum throughput in both cases and their corresponding consume_batch_size.

    Results: With ProcessPoolExecutor (consume_batch_size = 100000): read: 22806000 msgs 23353344000 b in: 14.148 s rate: 1574.17 mb/s With ThreadPoolExecutor (consume_batch_size = 10000): read: 22806000 msgs 23353344000 b in: 59.965 s rate: 371.409 mb/s

    I am trying to understand the reason behind the drop in performance using a single process. Is it because of Python GIL? or some Kafka consumer configuration issue?.

    Checklist

    Please provide the following information:

    • [x] confluent-kafka-python and librdkafka version (confluent_kafka.version(1.0.0) and confluent_kafka.libversion(1.0.0)):
    • [x] Apache Kafka broker version: 2.2.0
    • [ ] Client configuration: {...}
    • [ ] Operating system:
    • [ ] Provide client logs (with 'debug': '..' as necessary)
    • [ ] Provide broker log excerpts
    • [ ] Critical issue
    opened by skmatti 25
  • Application maximum poll interval (300000ms) exceeded by 88msApplication maximum poll interval (300000ms) exceeded by 88ms

    Application maximum poll interval (300000ms) exceeded by 88msApplication maximum poll interval (300000ms) exceeded by 88ms

    Description

    When I use subprocess.Popen in a flask project to open a script (the script instantiates the consumer object) to pull the message (using api consume and poll), when the consumer pulls a part of the data, it hangs. I will wait until 60000ms to report this error.

    Application maximum poll interval (300000ms) exceeded by 88ms(adjust max.poll.interval.ms for long-running message processing): leaving group

    Please help me to see how to solve it.

    How to reproduce

    Here are some of the code blocks in my script.


    consumer = Consumer({'bootstrap.servers':"xxx:xxx.xxx.xxx:9092",
                            "group.id":"xxx",
                            # 'enable.auto.commit':False,
                            "fetch.wait.max.ms":3000,
                            # 'fetch.min.bytes':1024 * 512,
                            'session.timeout.ms':6000,
                            "on_commit":_on_send_response,
                            "default.topic.config":{"auto.offset.reset": "earliest"}
                           }
                            )
    
       pt = TopicPartition("data_checksum2", 0)  
       consumer.assign([pt])
      try:
           while True:
               filter_dict2 = filter_dict
               # time.sleep()
               msg = consumer.poll(0.2)
               if msg:
                   table = json.loads(msg.value().decode("utf-8")).get('table')
                   if table in filter_dict2:
                       # gtid = json.loads((msg.value().decode("utf-8"))).get('gtid')
                       # offset = msg.offset()
                       print(json.loads((msg.value().decode("utf-8"))).get('es'), )
                       try:
                           producer.produce(str(filter_dict2.get(table)), msg.value())
                       except Exception:
                           producer.poll(0)
                           producer.flush(1)
                   elif msg.error():
                       if msg.error().code() == KafkaError._PARTITION_EOF:
                           continue
                       else:
                           aelog.error(msg.error())
               else:
                   continue
       except Exception as e:
           print(e)
       finally:
           try:
               aelog.info("同步提交补救成功")
               print(consumer.committed([pt]))
           except KafkaError as e:
               consumer.close()
    

    Checklist

    Please provide the following information:

    • [0.11.6 ] confluent-kafka-python and librdkafka version (confluent_kafka.version() and confluent_kafka.libversion()):
    • [2.1.0 ] Apache Kafka broker version:
    • [ ] Client configuration: {...}
    • [ ] Operating system:
    • [ ] Provide client logs (with 'debug': '..' as necessary)
    • [ ] Provide broker log excerpts
    • [ ] Critical issue

    MAXPOLL|rdkafka#consumer-1| [thrd:main]: Application maximum poll interval (300000ms) exceeded by 88ms (adjust max.poll.interval.ms for long-running message processing): leaving group

    wait info 
    opened by ybbiubiubiu 25
  • first attempt at batch consumption of messages

    first attempt at batch consumption of messages

    This is a first attempt at adding batch consumption, a la https://github.com/confluentinc/confluent-kafka-python/issues/252

    Have not done a ton of testing yet, but I am able to retrieve messages. Let me know what you think.

    opened by tburmeister 25
  •     confluent_kafka/src/confluent_kafka.h:21:10: fatal error: 'librdkafka/rdkafka.h' file not found

    confluent_kafka/src/confluent_kafka.h:21:10: fatal error: 'librdkafka/rdkafka.h' file not found

    Can any one help me with this error?

    $sudo -H /usr/local/bin/pip install confluent-kafka Collecting confluent-kafka Downloading confluent-kafka-0.9.4.tar.gz (40kB) 100% |████████████████████████████████| 40kB 1.3MB/s Building wheels for collected packages: confluent-kafka Running setup.py bdist_wheel for confluent-kafka ... error Complete output from command /usr/local/opt/python/bin/python2.7 -u -c "import setuptools, tokenize;file='/private/tmp/pip-build-qMzR09/confluent-kafka/setup.py';f=getattr(tokenize, 'open', open)(file);code=f.read().replace('\r\n', '\n');f.close();exec(compile(code, file, 'exec'))" bdist_wheel -d /tmp/tmp_c59HNpip-wheel- --python-tag cp27: running bdist_wheel running build running build_py creating build creating build/lib.macosx-10.11-x86_64-2.7 creating build/lib.macosx-10.11-x86_64-2.7/confluent_kafka copying confluent_kafka/init.py -> build/lib.macosx-10.11-x86_64-2.7/confluent_kafka creating build/lib.macosx-10.11-x86_64-2.7/confluent_kafka/avro copying confluent_kafka/avro/init.py -> build/lib.macosx-10.11-x86_64-2.7/confluent_kafka/avro copying confluent_kafka/avro/cached_schema_registry_client.py -> build/lib.macosx-10.11-x86_64-2.7/confluent_kafka/avro creating build/lib.macosx-10.11-x86_64-2.7/confluent_kafka/kafkatest copying confluent_kafka/kafkatest/init.py -> build/lib.macosx-10.11-x86_64-2.7/confluent_kafka/kafkatest copying confluent_kafka/kafkatest/verifiable_client.py -> build/lib.macosx-10.11-x86_64-2.7/confluent_kafka/kafkatest copying confluent_kafka/kafkatest/verifiable_consumer.py -> build/lib.macosx-10.11-x86_64-2.7/confluent_kafka/kafkatest copying confluent_kafka/kafkatest/verifiable_producer.py -> build/lib.macosx-10.11-x86_64-2.7/confluent_kafka/kafkatest creating build/lib.macosx-10.11-x86_64-2.7/confluent_kafka/avro/serializer copying confluent_kafka/avro/serializer/init.py -> build/lib.macosx-10.11-x86_64-2.7/confluent_kafka/avro/serializer copying confluent_kafka/avro/serializer/message_serializer.py -> build/lib.macosx-10.11-x86_64-2.7/confluent_kafka/avro/serializer running build_ext building 'confluent_kafka.cimpl' extension creating build/temp.macosx-10.11-x86_64-2.7 creating build/temp.macosx-10.11-x86_64-2.7/confluent_kafka creating build/temp.macosx-10.11-x86_64-2.7/confluent_kafka/src clang -fno-strict-aliasing -fno-common -dynamic -g -O2 -DNDEBUG -g -fwrapv -O3 -Wall -Wstrict-prototypes -I/usr/local/include -I/usr/local/opt/openssl/include -I/usr/local/opt/sqlite/include -I/usr/local/Cellar/python/2.7.13/Frameworks/Python.framework/Versions/2.7/include/python2.7 -c confluent_kafka/src/confluent_kafka.c -o build/temp.macosx-10.11-x86_64-2.7/confluent_kafka/src/confluent_kafka.o In file included from confluent_kafka/src/confluent_kafka.c:17: confluent_kafka/src/confluent_kafka.h:21:10: fatal error: 'librdkafka/rdkafka.h' file not found #include <librdkafka/rdkafka.h> ^ 1 error generated. error: command 'clang' failed with exit status 1


    Failed building wheel for confluent-kafka Running setup.py clean for confluent-kafka Failed to build confluent-kafka Installing collected packages: confluent-kafka Running setup.py install for confluent-kafka ... error Complete output from command /usr/local/opt/python/bin/python2.7 -u -c "import setuptools, tokenize;file='/private/tmp/pip-build-qMzR09/confluent-kafka/setup.py';f=getattr(tokenize, 'open', open)(file);code=f.read().replace('\r\n', '\n');f.close();exec(compile(code, file, 'exec'))" install --record /tmp/pip-VSGjNS-record/install-record.txt --single-version-externally-managed --compile: running install running build running build_py creating build creating build/lib.macosx-10.11-x86_64-2.7 creating build/lib.macosx-10.11-x86_64-2.7/confluent_kafka copying confluent_kafka/init.py -> build/lib.macosx-10.11-x86_64-2.7/confluent_kafka creating build/lib.macosx-10.11-x86_64-2.7/confluent_kafka/avro copying confluent_kafka/avro/init.py -> build/lib.macosx-10.11-x86_64-2.7/confluent_kafka/avro copying confluent_kafka/avro/cached_schema_registry_client.py -> build/lib.macosx-10.11-x86_64-2.7/confluent_kafka/avro creating build/lib.macosx-10.11-x86_64-2.7/confluent_kafka/kafkatest copying confluent_kafka/kafkatest/init.py -> build/lib.macosx-10.11-x86_64-2.7/confluent_kafka/kafkatest copying confluent_kafka/kafkatest/verifiable_client.py -> build/lib.macosx-10.11-x86_64-2.7/confluent_kafka/kafkatest copying confluent_kafka/kafkatest/verifiable_consumer.py -> build/lib.macosx-10.11-x86_64-2.7/confluent_kafka/kafkatest copying confluent_kafka/kafkatest/verifiable_producer.py -> build/lib.macosx-10.11-x86_64-2.7/confluent_kafka/kafkatest creating build/lib.macosx-10.11-x86_64-2.7/confluent_kafka/avro/serializer copying confluent_kafka/avro/serializer/init.py -> build/lib.macosx-10.11-x86_64-2.7/confluent_kafka/avro/serializer copying confluent_kafka/avro/serializer/message_serializer.py -> build/lib.macosx-10.11-x86_64-2.7/confluent_kafka/avro/serializer running build_ext building 'confluent_kafka.cimpl' extension creating build/temp.macosx-10.11-x86_64-2.7 creating build/temp.macosx-10.11-x86_64-2.7/confluent_kafka creating build/temp.macosx-10.11-x86_64-2.7/confluent_kafka/src clang -fno-strict-aliasing -fno-common -dynamic -g -O2 -DNDEBUG -g -fwrapv -O3 -Wall -Wstrict-prototypes -I/usr/local/include -I/usr/local/opt/openssl/include -I/usr/local/opt/sqlite/include -I/usr/local/Cellar/python/2.7.13/Frameworks/Python.framework/Versions/2.7/include/python2.7 -c confluent_kafka/src/confluent_kafka.c -o build/temp.macosx-10.11-x86_64-2.7/confluent_kafka/src/confluent_kafka.o In file included from confluent_kafka/src/confluent_kafka.c:17: confluent_kafka/src/confluent_kafka.h:21:10: fatal error: 'librdkafka/rdkafka.h' file not found #include <librdkafka/rdkafka.h> ^ 1 error generated. error: command 'clang' failed with exit status 1

    ----------------------------------------
    

    Command "/usr/local/opt/python/bin/python2.7 -u -c "import setuptools, tokenize;file='/private/tmp/pip-build-qMzR09/confluent-kafka/setup.py';f=getattr(tokenize, 'open', open)(file);code=f.read().replace('\r\n', '\n');f.close();exec(compile(code, file, 'exec'))" install --record /tmp/pip-VSGjNS-record/install-record.txt --single-version-externally-managed --compile" failed with error code 1 in /private/tmp/pip-build-qMzR09/confluent-kafka/

    installation 
    opened by datteswararao 25
  • consumers sometimes sees message timestamp as -1

    consumers sometimes sees message timestamp as -1

    confluent-kafka==0.9.2, librdkafka 0.9.2, Python 2.7.6, Ubuntu 14.04, kafka 0.10.1.0

    All producers run the same code and run on similar hosts. The consumer uses kafka-python==1.3.1 (instead of confluent-kafka) and record.timestamp sometimes is ok and sometimes (quite often) it is -1.

    wait info 
    opened by fillest 25
  • Document that offsets_for_times accepts milliseconds

    Document that offsets_for_times accepts milliseconds

    Other confluent-kafka-python APIs accept seconds since epoch, but this one uses milliseconds, so it's easy to think it accepts seconds. That would generally result in replaying all events.

    opened by timmc-edx 2
  • Unable to catch timeout/refused errors on the client

    Unable to catch timeout/refused errors on the client

    Related issue : https://github.com/confluentinc/confluent-kafka-python/issues/1222#issuecomment-1038753418

    Description

    When client is polling for new messages, the following errors might happens.

    FAIL [rdkafka#consumer-1] [thrd:localhost:9092/bootstrap]: localhost:9092/bootstrap: Connect to ipv6#[::1]:9092 failed: Connection refused (after 0ms in state CONNECT)
    
    FAIL [rdkafka#consumer-3] [thrd:sasl_ssl://foo/3]: foo/3: 1 request(s) timed out: disconnect (after 154583ms in state UP, 1 identical error(s) suppressed)“
    

    I am trying to silence these errors or handle in a different way.

    How to reproduce

    Use the below script to reproduce the error.

    https://github.com/ChillarAnand/avilpage.com/blob/master/scripts/confluent_kafka_consumer.py

    To avoid printing these messages, the context manager in the scripting can be used for now.

    Checklist

    Please provide the following information:

    • [x] confluent-kafka-python and librdkafka version (confluent_kafka.version() and confluent_kafka.libversion()):
    • [x] Apache Kafka broker version:
    • [x] Client configuration: {...}
    • [x] Operating system: macOS
    • [x] Provide client logs (with 'debug': '..' as necessary)
    • [x] Provide broker log excerpts - None
    • [x] Critical issue - No
    >>> import confluent_kafka
    >>> confluent_kafka.version()
    ('1.7.0', 17235968)
    >>> confluent_kafka.libversion()
    ('1.9.2', 17367807)
    
    opened by ChillarAnand 0
  • Drop support for Python 2

    Drop support for Python 2

    Originally posted by @chriselion in https://github.com/confluentinc/confluent-kafka-python/issues/1262#issuecomment-1012652420

    Python 3.2 seems to be the minimum required version, as functools.total_ordering is used in some places.

    If this is ok, I can go through the codebase and remove obsolete code related to Python2 compatibility. This would make it possible for me to catch anything that would require a higher version of Python

    opened by Viicos 1
  • Problem with producer - missing attribute

    Problem with producer - missing attribute

    Description

    I have a problem with creating my own producer using confluent-kafka-python. When I'm trying to run the producer I'm getting an error that FileDescriptor has no attribute 'containing_type'

    Details: Operating system: Windows 10 22H2

    Python packages versions: libprotoc 3.21.9 protobuf 4.21.12 google 3.0.0 confluent-kafka 1.9.2

    The error which I'm getting:

    Traceback (most recent call last):
      File "D:/Program Files/PyCharm/PyCharm Community Edition 2022.2.2/plugins/python-ce/helpers/pydev/pydevd.py", line 1496, in _exec
        pydev_imports.execfile(file, globals, locals)  # execute the script
      File "D:\Program Files\PyCharm\PyCharm Community Edition 2022.2.2\plugins\python-ce\helpers\pydev\_pydev_imps\_pydev_execfile.py", line 18, in execfile
        exec(compile(contents+"\n", file, 'exec'), glob, loc)
      File "D:\Projekty\PassiveHome.Endpoint\protobuf_producer.py", line 80, in <module>
        main()
      File "D:\Projekty\PassiveHome.Endpoint\protobuf_producer.py", line 37, in main
        protobuf_serializer = ProtobufSerializer(message_pb2,
      File "D:\Projekty\PassiveHome.Endpoint\venv\lib\site-packages\confluent_kafka\schema_registry\protobuf.py", line 291, in __init__
        self._msg_index = _create_msg_index(descriptor)
      File "D:\Projekty\PassiveHome.Endpoint\venv\lib\site-packages\confluent_kafka\schema_registry\protobuf.py", line 92, in _create_msg_index
        while current.containing_type is not None:
    AttributeError: 'google._upb._message.FileDescriptor' object has no attribute 'containing_type'
    

    How to reproduce

    How the .proto file looks

    syntax = "proto3";
    
    import "google/protobuf/timestamp.proto";
    package PassiveHome.Endpoint;
    
    message Message {
      int32 Id = 1;
      int32 EndpointId = 2;
      repeated AnalyzerMessage AnalyzerMessages = 3;
    
      message AnalyzerMessage{
        int32 AnalyzerId = 1;
        repeated SensorData SensorList =2;
    
        message SensorData {
          uint32 SensorId =1;
          float SensorValue =2;
          uint32 SensorType =3;
          google.protobuf.Timestamp CreatedAt =4;
        }
      }
    }
    

    Generated message_pb2.py

    # -*- coding: utf-8 -*-
    # Generated by the protocol buffer compiler.  DO NOT EDIT!
    # source: message.proto
    """Generated protocol buffer code."""
    from google.protobuf.internal import builder as _builder
    from google.protobuf import descriptor as _descriptor
    from google.protobuf import descriptor_pool as _descriptor_pool
    from google.protobuf import symbol_database as _symbol_database
    # @@protoc_insertion_point(imports)
    
    _sym_db = _symbol_database.Default()
    
    
    from google.protobuf import timestamp_pb2 as google_dot_protobuf_dot_timestamp__pb2
    
    
    DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile(b'\n\rmessage.proto\x12\x14PassiveHome.Endpoint\x1a\x1fgoogle/protobuf/timestamp.proto\"\xe0\x02\n\x07Message\x12\n\n\x02Id\x18\x01 \x01(\x05\x12\x12\n\nEndpointId\x18\x02 \x01(\x05\x12G\n\x10\x41nalyzerMessages\x18\x03 \x03(\x0b\x32-.PassiveHome.Endpoint.Message.AnalyzerMessage\x1a\xeb\x01\n\x0f\x41nalyzerMessage\x12\x12\n\nAnalyzerId\x18\x01 \x01(\x05\x12L\n\nSensorList\x18\x02 \x03(\x0b\x32\x38.PassiveHome.Endpoint.Message.AnalyzerMessage.SensorData\x1av\n\nSensorData\x12\x10\n\x08SensorId\x18\x01 \x01(\r\x12\x13\n\x0bSensorValue\x18\x02 \x01(\x02\x12\x12\n\nSensorType\x18\x03 \x01(\r\x12-\n\tCreatedAt\x18\x04 \x01(\x0b\x32\x1a.google.protobuf.Timestampb\x06proto3')
    
    _builder.BuildMessageAndEnumDescriptors(DESCRIPTOR, globals())
    _builder.BuildTopDescriptorsAndMessages(DESCRIPTOR, 'message_pb2', globals())
    if _descriptor._USE_C_DESCRIPTORS == False:
    
      DESCRIPTOR._options = None
      _MESSAGE._serialized_start=73
      _MESSAGE._serialized_end=425
      _MESSAGE_ANALYZERMESSAGE._serialized_start=190
      _MESSAGE_ANALYZERMESSAGE._serialized_end=425
      _MESSAGE_ANALYZERMESSAGE_SENSORDATA._serialized_start=307
      _MESSAGE_ANALYZERMESSAGE_SENSORDATA._serialized_end=425
    # @@protoc_insertion_point(module_scope)
    
    

    Producer code

    import argparse
    from uuid import uuid4
    
    from six.moves import input
    
    # Protobuf generated class; resides at ./protobuf/user_pb2.py
    import protogenerated.message_pb2 as message_pb2
    from confluent_kafka import Producer
    from confluent_kafka.serialization import StringSerializer, SerializationContext, MessageField
    from confluent_kafka.schema_registry import SchemaRegistryClient
    from confluent_kafka.schema_registry.protobuf import ProtobufSerializer
    
    
    def delivery_report(err, msg):
        """
        Reports the failure or success of a message delivery.
    
        Args:
            err (KafkaError): The error that occurred on None on success.
            msg (Message): The message that was produced or failed.
        """
    
        if err is not None:
            print("Delivery failed for User record {}: {}".format(msg.key(), err))
            return
        print('User record {} successfully produced to {} [{}] at offset {}'.format(
            msg.key(), msg.topic(), msg.partition(), msg.offset()))
    
    
    def main():
        topic = "testTopic"
    
        schema_registry_conf = {'url': "http://schemaregistry:8545"}
        schema_registry_client = SchemaRegistryClient(schema_registry_conf)
    
        string_serializer = StringSerializer('utf8')
        protobuf_serializer = ProtobufSerializer(message_pb2,
                                                 schema_registry_client,
                                                 {'use.deprecated.format': False})
    
    
        producer_conf = {'bootstrap.servers': "localhost:19092"}
    
        producer = Producer(producer_conf)
    
        print("Producing user records to topic {}. ^C to exit.".format(topic))
        while True:
            # Serve on_delivery callbacks from previous calls to produce()
            producer.poll(0.0)
            try:
                user_name = input("Enter name: ")
                user_favorite_number = int(input("Enter favorite number: "))
                user_favorite_color = input("Enter favorite color: ")
    
                message = message_pb2.Message()
                message.Id = 1
                message.EndpointId = 1
                analyzerMessages = message.AnalyzerMessages.add()
                analyzerMessages.AnalyzerId = 1
                sensorList = analyzerMessages.SensorList.add()
                sensorList.SensorId = 1
                sensorList.SensorValue = float(1)
                sensorList.SensorType = 1
    
                producer.produce(topic=topic, partition=0,
                                 key=string_serializer(str(uuid4())),
                                 value=protobuf_serializer(message, SerializationContext(topic, MessageField.VALUE)),
                                 on_delivery=delivery_report)
            except (KeyboardInterrupt, EOFError):
                break
            except ValueError:
                print("Invalid input, discarding record...")
                continue
    
        print("\nFlushing records...")
        producer.flush()
    
    
    if __name__ == '__main__':
        main()
    
    

    Checklist

    Please provide the following information:

    • [x] confluent-kafka-python and librdkafka version (confluent_kafka.version() and confluent_kafka.libversion()):
    • [ ] Apache Kafka broker version:
    • [x] Client configuration: {...}
    • [x] Operating system:
    • [ ] Provide client logs (with 'debug': '..' as necessary)
    • [ ] Provide broker log excerpts
    • [ ] Critical issue
    opened by exo0 2
  • oauth token not refreshing

    oauth token not refreshing

    Description

    In my project, I am using confluent-kafka-python-1.9.2 to consumer and produce messages onto kafka topic. OAuth provider is already set by the other team and token gets expires after 30 minutes. In My code I have used this configuration for the consumer.

    config = {'bootstrap.servers': '<server-url>:9093', 
    'group.id': 'consumer-group', 
    'auto.offset.reset': 'latest',
    'queued.max.messages.kbytes': 100000,
    'enable.auto.commit': False,
    'fetch.min.bytes': 100000,
    'security.protocol': 'SASL_SSL',
    'sasl.mechanism': 'OAUTHBEARER', 
    sasl.oauthbearer.config': 'oauth_cb',
    'oauth_cb': functools.partial(_get_token,<client_id>,<client_secret>), 
    }
    

    _get_token function:

    def _get_token(client_id, client_secret, oauth_config):
        realm = KeycloakRealm(
            server_url="<auth_server_url>",
            realm_name="<realm_name>",
        )
        oidc_client = realm.open_id_connect(
            client_id=client_id,
            client_secret=client_secret,
        )
        client_credentials = oidc_client.client_credentials()
        access_token = client_credentials["access_token"]
        expires_in = client_credentials["expires_in"]
        print(client_credentials)
        return access_token, time.time() + expires_in
    

    Consumer code:

    #.....
    config.update({"key.deserializer": key_deserializer,
                              "value.deserializer": value_deserializer,})
    consumer = DeserializingConsumer(config)
    while True:
            message = self._client.poll(timeout) if timeout else self._client.poll()
            if message is None:
                  logger.debug("No messages found")
                  continue
            message_error = message.error()
            if message_error:
                  logger.error(message_error)
           # processing message code 
    

    So while running the consumer, it can fetch the token for the first time and can consume the messages without any issue. but as token expires after 30 minutes, I started getting following error.

    confluent_kafka.error.ConsumeError: KafkaError{code=TOPIC_AUTHORIZATION_FAILED,val=29,str="Fetch from broker 31 failed: Broker: Topic authorization failed"}

    even I set expires_in to 30 seconds or 1 minute, I still get the above error. so I don't understand that _get_token is called after every 1 minute but when after 30 minutes, I get the above error.

    I also tried to set oauthbearer_token_refresh_cb but got this error:

    cimpl.KafkaException: KafkaError{code=_INVALID_ARG,val=-186,str="Property "oauthbearer_token_refresh_cb" must be set through dedicated .._set_..() function"}

    So I would like to know how to refresh token?

    How to reproduce

    Checklist

    Please provide the following information:

    • [x] confluent-kafka-python and librdkafka version (confluent_kafka.version() and confluent_kafka.libversion()):
    • [ ] Apache Kafka broker version:
    • [x] Client configuration: {...}
    • [ ] Operating system:
    • [x] Provide client logs (with 'debug': '..' as necessary)
    • [ ] Provide broker log excerpts
    • [ ] Critical issue
    opened by sms1190 0
Releases(v1.9.2)
  • v1.9.2(Aug 3, 2022)

    confluent-kafka-python v1.9.2

    v1.9.2 is a maintenance release with the following fixes and enhancements:

    • Support for setting principal and SASL extensions in oauth_cb and handle failures (@Manicben, #1402)
    • Wheel for macOS M1/arm64
    • KIP-140 Admin API ACL fix: When requesting multiple create_acls or delete_acls operations, if the provided ACL bindings or ACL binding filters are not unique, an exception will be thrown immediately rather than later when the responses are read. (#1370).
    • KIP-140 Admin API ACL fix: Better documentation of the describe and delete ACLs behavior when using the MATCH resource patter type in a filter. (#1373).
    • Avro serialization examples: added a parameter for using a generic or specific Avro schema. (#1381).

    confluent-kafka-python is based on librdkafka v1.9.2, see the librdkafka release notes for a complete list of changes, enhancements, fixes and upgrade considerations.

    Source code(tar.gz)
    Source code(zip)
  • v1.9.0(Jun 16, 2022)

    confluent-kafka-python v1.9.0

    v1.9.0 is a feature release:

    • OAUTHBEARER OIDC support
    • KIP-140 Admin API ACL support

    Fixes

    • The warnings for use.deprecated.format (introduced in v1.8.2) had its logic reversed, which result in warning logs to be emitted when the property was correctly configured, and the log message itself also contained text that had it backwards. The warning is now only emitted when use.deprecated.format is set to the old legacy encoding (True). #1265
    • Use str(Schema) rather than Schema.to_json to prevent fastavro from raising exception TypeError: unhashable type: 'mappingproxy'. (@ffissore, #1156, #1197)
    • Fix the argument order in the constructor signature for AvroDeserializer/Serializer: the argument order in the constructor signature for AvroDeserializer/Serializer was altered in v1.6.1, but the example is not changed yet. (@DLT1412, #1263)
    • Fix the json deserialization errors from _schema_loads for valid primitive declarations. (@dylrich, #989)

    confluent-kafka-python is based on librdkafka v1.9.0, see the librdkafka release notes for a complete list of changes, enhancements, fixes and upgrade considerations.

    Source code(tar.gz)
    Source code(zip)
  • v1.8.2(Jan 7, 2022)

    confluent-kafka-python v1.8.2

    v1.8.2 is a maintenance release with the following fixes and enhancements:

    • IMPORTANT: Added mandatory use.deprecated.format to ProtobufSerializer and ProtobufDeserializer. See Upgrade considerations below for more information.
    • Python 2.7 binary wheels are no longer provided. Users still on Python 2.7 will need to build confluent-kafka from source and install librdkafka separately, see README.md for build instructions.
    • Added use.latest.version and skip.known.types (Protobuf) to the Serializer classes. (Robert Yokota, #1133).
    • list_topics() and list_groups() added to AdminClient.
    • Added support for headers in the SerializationContext (Laurent Domenech-Cabaud)
    • Fix crash in header parsing (Armin Ronacher, #1165)
    • Added long package description in setuptools (Bowrna, #1172).
    • Documentation fixes by Aviram Hassan and Ryan Slominski.
    • Don't raise AttributeError exception when CachedSchemaRegistryClient constructor raises a valid exception.

    confluent-kafka-python is based on librdkafka v1.8.2, see the librdkafka release notes for a complete list of changes, enhancements, fixes and upgrade considerations.

    Note: There were no v1.8.0 and v1.8.1 releases.

    Upgrade considerations

    Protobuf serialization format changes

    Prior to this version the confluent-kafka-python client had a bug where nested protobuf schemas indexes were incorrectly serialized, causing incompatibility with other Schema-Registry protobuf consumers and producers.

    This has now been fixed, but since the old defect serialization and the new correct serialization are mutually incompatible the user of confluent-kafka-python will need to make an explicit choice which serialization format to use during a transitory phase while old producers and consumers are upgraded.

    The ProtobufSerializer and ProtobufDeserializer constructors now both take a (for the time being) configuration dictionary that requires the use.deprecated.format configuration property to be explicitly set.

    Producers should be upgraded first and as long as there are old (<=v1.7.0) Python consumers reading from topics being produced to, the new (>=v1.8.2) Python producer must be configured with use.deprecated.format set to True.

    When all existing messages in the topic have been consumed by older consumers the consumers should be upgraded and both new producers and the new consumers must set use.deprecated.format to False.

    The requirement to explicitly set use.deprecated.format will be removed in a future version and the setting will then default to False (new format).

    Source code(tar.gz)
    Source code(zip)
  • v1.7.0(May 12, 2021)

    v1.7.0 is a maintenance release with the following fixes and enhancements:

    • Add error_cb to confluent_cloud.py example (#1096).
    • Clarify that doc output varies based on method (@slominskir, #1098).
    • Docs say Schema when they mean SchemaReference (@slominskir, #1092).
    • Add documentation for NewTopic and NewPartitions (#1101).

    confluent-kafka-python is based on librdkafka v1.7.0, see the librdkafka release notes for a complete list of changes, enhancements, fixes and upgrade considerations.

    Source code(tar.gz)
    Source code(zip)
  • v1.6.1(Mar 25, 2021)

    v1.6.1

    v1.6.1 is a feature release:

    • KIP-429 - Incremental consumer rebalancing support.
    • OAUTHBEARER support.

    Fixes

    • Add return_record_name=True to AvroDeserializer (@slominskir, #1028)
    • Fix deprecated schema.Parse call (@casperlehmann, #1006).
    • Make reader schema optional in AvroDeserializer (@97nitt, #1000).
    • Add **kwargs to legacy AvroProducer and AvroConsumer constructors to support all Consumer and Producer base class constructor arguments, such as logger (@venthur, #699).
    • Add bool for permanent schema delete (@slominskir, #1029).
    • The avro package is no longer required for Schema-Registry support (@jaysonsantos, #950).
    • Only write to schema cache once, improving performance (@fimmtiu, #724).
    • Improve Schema-Registry error reporting (@jacopofar, #673).
    • producer.flush() could return a non-zero value without hitting the specified timeout.

    confluent-kafka-python is based on librdkafka v1.6.1, see the librdkafka release notes for a complete list of changes, enhancements, fixes and upgrade considerations.

    Source code(tar.gz)
    Source code(zip)
  • v1.6.0(Feb 5, 2021)

    v1.6.0

    v1.6.0 is a feature release with the following features, fixes and enhancements:

    • Bundles librdkafka v1.6.0 which adds support for Incremental rebalancing, Sticky producer partitioning, Transactional producer scalabilty improvements, and much much more. See link to release notes below.
    • Rename asyncio.py example to avoid circular import (#945)
    • The Linux wheels are now built with manylinux2010 (rather than manylinux1) since OpenSSL v1.1.1 no longer builds on CentOS 5. Older Linux distros may thus no longer be supported, such as CentOS 5.
    • The in-wheel OpenSSL version has been updated to 1.1.1i.
    • Added Message.latency() to retrieve the per-message produce latency.
    • Added trove classifiers.
    • Consumer destructor will no longer trigger consumer_close(), consumer.close() must now be explicitly called if the application wants to leave the consumer group properly and commit final offsets.
    • Fix PY_SSIZE_T_CLEAN warning
    • Move confluent_kafka/ to src/ to avoid pytest/tox picking up the local dir
    • Added producer.purge() to purge messages in-queue/flight (@peteryin21, #548)
    • Added AdminClient.list_groups() API (@messense, #948)
    • Rename asyncio.py example to avoid circular import (#945)

    confluent-kafka-python is based on librdkafka v1.6.0, see the librdkafka release notes for a complete list of changes, enhancements, fixes and upgrade considerations.

    Source code(tar.gz)
    Source code(zip)
  • v1.5.0(Jul 22, 2020)

  • v1.4.2(May 22, 2020)

    Confluent's Python client for Apache Kafka

    v1.4.2 is a maintenance release with the following fixes and enhancements:

    • Fix produce/consume hang after partition goes away and comes back, such as when a topic is deleted and re-created (regression in v1.3.0).
    • Consumer: Reset the stored offset when partitions are un-assign()ed. This fixes the case where a manual offset-less commit() or the auto-committer would commit a stored offset from a previous assignment before a new message was consumed by the application
    • Seed random number generator used for random broker selection.
    • Multiple calls to Consumer.close will not raise RunTimeError (@mkmoisen , #678)
    • Module lookup failures raise ImportError (@overstre #786)
    • Fix SchemaRegistryClient basic auth configuration (@blown302 #853)
    • Don't send empty credentials to SR in Authorization Header (#863)
    • miscellaneous test cleanup and enhancements (@nicht #843 ,#863)

    confluent-kafka-python is based on librdkafka v1.4.2, see the librdkafka v1.4.2 release notes for a complete list of changes, enhancements, fixes and upgrade considerations.

    Source code(tar.gz)
    Source code(zip)
  • v1.4.0(Apr 21, 2020)

    Confluent's Python client for Apache Kafka

    v1.4.0 is a feature release:

    • KIP-98: Transactional Producer API
    • KIP-345: Static consumer group membership (by @rnpridgeon)
    • KIP-511: Report client software name and version to broker
    • Generic Serde API (experimental)
    • New AvroSerializer and AvroDeserializer implementations including configurable subject name strategies.
    • JSON Schema support (For Schema Registry)
    • Protobuf support (For Schema Registry)

    confluent-kafka-python is based on librdkafka v1.4.0, see the librdkafka v1.4.0 release notes for a complete list of changes, enhancements, fixes and upgrade considerations.

    Transactional Producer API

    Release v1.4.0 for confluent-kafka-python adds complete Exactly-Once-Semantics (EOS) functionality, supporting the idempotent producer (since v1.0.0), a transaction-aware consumer (since v1.2.0) and full producer transaction support (v1.4.0).

    This enables developers to create Exactly-Once applications with Apache Kafka.

    See the Transactions in Apache Kafka page for an introduction and check the transactions example.

    Generic Serializer API

    Release v1.4.0 introduces a new, experimental, API which adds serialization capabilities to Kafka Producer and Consumer. This feature provides the ability to configure Producer/Consumer key and value serializers/deserializers independently. Previously all serialization must be handled prior to calling Producer.produce and after Consumer.poll.

    This release ships with 3 built-in, Java compatible, standard serializer and deserializer classes:

    | Name | Type | Format | |--------|----------|--------------------| | Double | float | IEEE 764 binary64 | | Integer | int | int32 | | String | Unicode | bytes* |

    * The StringSerializer codec is configurable and supports any one of Python's standard encodings. If left unspecified 'UTF-8' will be used.

    Additional serialization implementations are possible through the extension of the Serializer and Deserializer base classes.

    See avro_producer.py and avro_consumer.py for example usage.

    Avro, Protobuf and JSON Schema Serializers

    Release v1.4.0 for confluent-kafka-python adds support for two new Schema Registry serialization formats with its Generic Serialization API; JSON and Protobuf. A new set of Avro Serialization classes have also been added to conform to the new API.

    | Format | Serializer Example | Deserializer Example | |--------|--------------------|----------------------| | Avro | avro_producer.py | avro_consumer.py| | JSON | json_producer.py | json_consumer.py | | Protobuf| protobuf_producer.py | protobuf_consumer.py |

    Security fixes

    Two security issues have been identified in the SASL SCRAM protocol handler:

    • The client nonce, which is expected to be a random string, was a static string.
    • If sasl.username and sasl.password contained characters that needed escaping, a buffer overflow and heap corruption would occur. This was protected, but too late, by an assertion.

    Both of these issues are fixed in this release.

    Enhancements

    • Bump OpenSSL to v1.0.2u
    • Bump monitoring-interceptors to v0.11.3

    Fixes

    General:

    • Remove unused variable from README example (@qzyse2017, #691)
    • Add delivery report to Avro example (#742)
    • Update asyncio example companion blog URL (@filippovitale, #760)

    Schema Registry/Avro:

    • Trim trailing / from Schema Registry base URL (@IvanProdaiko94 , #749)
    • Make automatic Schema registration optional (@dpfeif, #718)
    • Bump Apache Avro to 1.9.2[.1] (@RyanSkraba, #779)
    • Correct the SchemaRegistry authentication for SASL_INHERIT (@abij, #733)

    Also see the librdkafka v1.4.0 release notes for fixes to the underlying client implementation.

    Source code(tar.gz)
    Source code(zip)
  • v1.3.0(Dec 13, 2019)

    Confluent's Python client for Apache Kafka

    confluent-kafka-python is based on librdkafka v1.3.0, see the librdkafka v1.3.0 release notes for a complete list of changes, enhancements, fixes and upgrade considerations.

    This is a feature release adding support for KIP-392 Fetch from follower, allowing a consumer to fetch messages from the closest replica to increase throughput and reduce cost.

    Features

    • KIP-392 - Fetch messages from closest replica / follower (by @mhowlett)
    • Python 3.8 binary wheel support for OSX and Linux. Windows Python 3.8 binary wheels are not currently available.

    Enhancements

    • New example using python3 and asyncio (by @mhowlett)
    • Add warnings for inconsistent security configuration.
    • Optimizations to hdr histogram (stats) rollover.
    • Print compression type per message-set when debug=msg
    • Various doc fixes, updates and enhancements (@edenhill , @mhowlett)

    Fixes

    • Fix crash when new topic is not created. (@Mostafa Razavi,#725)
    • Fix stringer/repr for SerializerError class(@ferozed, #675)
    • Fix consumer_lag in stats when consuming from broker versions <0.11.0.0 (regression in librdkafka v1.2.0).
    • Properly handle new Kafka-framed SASL GSSAPI frame semantics on Windows (#2542). 
This bug was introduced in v1.2.0 and broke GSSAPI authentication on Windows.
    • Fix msgq (re)insertion code to avoid O(N^2) insert sort operations on retry (#2508).
 The msgq insert code now properly handles interleaved and overlapping message range inserts, which may occur during Producer retries for
 high-throughput applications.
    • Fix producer insert msgq regression in v1.2.1 (#2450).
    • Upgrade builtin lz4 to 1.9.2 (CVE-2019-17543, #2598).
    • Don't trigger error when broker hostname changes (#2591).
    • Less strict message.max.bytes check for individual messages (#993).
    • Don't call timespec_get() on OSX (since it was removed in recent XCode) by @maparent .
    • LZ4 is available from ProduceRequest 0, not 3 (fixes assert in #2480).
    • Address 12 code issues identified by Coverity static code analysis.
    Source code(tar.gz)
    Source code(zip)
  • v1.2.0(Sep 27, 2019)

    Confluent's Python client for Apache Kafka

    confluent-kafka-python is based on librdkafka v1.2.0, see the librdkafka v1.2.0 release notes for a complete list of changes, enhancements, fixes and upgrade considerations.

    • Transaction aware consumer (isolation.level=read_committed) implemented by @mhowlett.
    • Sub-millisecond buffering (linger.ms) on the producer.
    • Improved authentication errors (KIP-152)

    Consumer-side transaction support

    This release adds consumer-side support for transactions. In previous releases, the consumer always delivered all messages to the application, even those in aborted or not yet committed transactions. In this release, the consumer will by default skip messages in aborted transactions. This is controlled through the new isolation.level configuration property which defaults to read_committed (only read committed messages, filter out aborted and not-yet committed transactions), to consume all messages, including for aborted transactions, you may set this property to read_uncommitted to get the behaviour of previous releases. For consumers in read_committed mode, the end of a partition is now defined to be the offset of the last message of a successfully committed transaction (referred to as the 'Last Stable Offset'). For non-transactional messages there is no change from previous releases, they will always be read, but a consumer will not advance into a not yet committed transaction on the partition.

    Upgrade considerations

    • linger.ms default was changed from 0 to 0.5 ms to promote some level of batching even with default settings.

    New configuration properties

    • Consumer property isolation.level=read_committed ensures the consumer will only read messages from successfully committed producer transactions. Default is read_committed. To get the previous behaviour, set the property to read_uncommitted, which will read all messages produced to a topic, regardless if the message was part of an aborted or not yet committed transaction.

    Enhancements

    • Cache FastAvro schema for improved Avro Serialization/Deserialization (@BBM89, #627)
    • Protocol decoding optimization, increasing consume performance.
    • Add CachedSchemaRegistry docs (@lowercase24 , #495)

    Fixes

    General:

    • Rate limit IO-based queue wakeups to linger.ms, this reduces CPU load and lock contention for high throughput producer applications. (#2509)
    • SSL: Use only hostname (not port) when valid broker hostname (by Hunter Jacksson)
    • SSL: Ignore OpenSSL cert verification results if enable.ssl.certificate.verification=false (@salisbury-espinosa)
    • SASL Kerberos/GSSAPI: don't treat kinit ECHILD errors as errors (@hannip)
    • Refresh broker list metadata even if no topics to refresh (#2476)
    • Correct AdminClient doc (@lowercase24, #653)
    • Update Avro example to be compliant with csh (@andreyferriyan , #668)
    • Correct Avro example typo (@AkhilGNair, #598)

    Consumer:

    • Make pause|resume() synchronous, ensuring that a subsequent poll() will not return messages for the paused partitions.
    • Consumer doc fixes (@hrchu , #646, #648)

    Producer:

    • Fix message timeout handling for leader-less partitions.
    • message.timeout.ms=0 is now accepted even if linger.ms > 0 (by Jeff Snyder)
    Source code(tar.gz)
    Source code(zip)
  • v1.1.0(Jul 15, 2019)

    Confluent's Python client for Apache Kafka

    confluent-kafka-python is based on librdkafka v1.1.0, see the librdkafka v1.1.0 release notes for a complete list of changes, enhancements, fixes and upgrade considerations.

    • In-memory SSL certificates (PEM, DER, PKCS#12) support (by @noahdav at Microsoft)
    • Use Windows Root/CA SSL Certificate Store (by @noahdav at Microsoft)
    • ssl.endpoint.identification.algorithm=https (off by default) to validate the broker hostname matches the certificate. Requires OpenSSL >= 1.0.2(included with Wheel installations))
    • Improved GSSAPI/Kerberos ticket refresh
    • Confluent monitoring interceptor package bumped to v0.11.1 (#634)

    Upgrade considerations

    • Windows SSL users will no longer need to specify a CA certificate file/directory (ssl.ca.location), librdkafka will load the CA certs by default from the Windows Root Certificate Store.
    • SSL peer (broker) certificate verification is now enabled by default (disable with enable.ssl.certificate.verification=false)
    • %{broker.name} is no longer supported in sasl.kerberos.kinit.cmd since kinit refresh is no longer executed per broker, but per client instance.

    SSL

    New configuration properties:

    • ssl.key.pem - client's private key as a string in PEM format
    • ssl.certificate.pem - client's public key as a string in PEM format
    • enable.ssl.certificate.verification - enable(default)/disable OpenSSL's builtin broker certificate verification.
    • enable.ssl.endpoint.identification.algorithm - to verify the broker's hostname with its certificate (disabled by default).
    • Add new rd_kafka_conf_set_ssl_cert() to pass PKCS#12, DER or PEM certs in (binary) memory form to the configuration object.
    • The private key data is now securely cleared from memory after last use.

    Enhancements

    • Bump message.timeout.ms max value from 15 minutes to 24 days (@sarkanyi, workaround for #2015)

    Fixes

    • SASL GSSAPI/Kerberos: Don't run kinit refresh for each broker, just per client instance.
    • SASL GSSAPI/Kerberos: Changed sasl.kerberos.kinit.cmd to first attempt ticket refresh, then acquire.
    • SASL: Proper locking on broker name acquisition.
    • Consumer: max.poll.interval.ms now correctly handles blocking poll calls, allowing a longer poll timeout than the max poll interval.
    • configure: Fix libzstd static lib detection
    • PyTest pinned to latest version supporting python 2 (#634)
    Source code(tar.gz)
    Source code(zip)
  • v1.0.1(Jun 13, 2019)

    Confluent's Python client for Apache Kafka

    confluent-kafka-python is based on librdkafka v1.0.1, see the librdkafka v1.0.1 release notes for a complete list of changes, enhancements, fixes and upgrade considerations.

    v1.0.1 is a maintenance release with the following fixes:

    • Fix consumer stall when broker connection goes down (issue #2266 introduced in v1.0.0)
    • Fix AdminAPI memory leak when broker does not support request (@souradeep100, #2314)
    • SR client: Don't disable cert verification if no ssl.ca.location set (#578)
    • Treat ECONNRESET as standard Disconnects (#2291)
    • OpenSSL version bump to 1.0.2s
    • Update/fix protocol error response codes (@benesch)
    • Update Consumer get_watermark_offsets docstring (@hrchu, #572)
    • Update Consumer subscribe docstring to include on_assign and on_revoke args (@hrchu, #571)
    • Update delivery report string formatting (@hrchu, #575)
    • Update logging configuration code example document (@soxofaan , #579)
    • Implement environment markers to fix poetry (@fishman, #583)
    Source code(tar.gz)
    Source code(zip)
  • v1.0.0(Apr 5, 2019)

    Confluent's Python client for Apache Kafka v1.0.0

    confluent-kafka-python is based on librdkafka v1.0.0, see the librdkafka v1.0.0 release notes for a complete list of changes, enhancements and fixes and upgrade considerations.

    v1.0.0 is a major feature release:

    • Idempotent producer - guaranteed ordering, exactly-once producing) support.
    • Sparse/on-demand connections - connections are no longer maintained to all brokers in the cluster.
    • KIP-62 - max.poll.interval.ms support in the Consumer.

    This release also changes configuration defaults and deprecates a set of configuration properties, make sure to read the Upgrade considerations section below.

    Upgrade considerations (IMPORTANT)

    Configuration default changes

    The following configuration properties have changed default values, which may require application changes:

    • acks(alias request.required.acks) now defaults to all; wait for all in-sync replica brokers to ack. The previous default, 1 , only waited for an ack from the partition leader. This change places a greater emphasis on durability at a slight cost to latency. It is not recommended that you lower this value unless latency takes a higher precedence than data durability in your application.

    • broker.version.fallback now to defaults to 0.10, previously 0.9. broker.version.fallback.ms now defaults to 0. Users on Apache Kafka <0.10 must set api.version.request=false and broker.version.fallback=.. to their broker version. For users >=0.10 there is no longer any need to specify any of these properties.

    • enable.partition.eof now defaults to false. KafkaError._PARTITION_EOF was previously emitted by default to signify the consumer has reached the end of a partition. Applications which rely on this behavior must now explicitly set enable.partition.eof=true if this behavior is required. This change simplifies the more common case where consumer applications consume in an endless loop.

    group.id is now required for Python consumers.

    Deprecated configuration properties

    The following configuration properties have been deprecated. Use of any deprecated configuration property will result in a warning when the client instance is created. The deprecated configuration properties will be removed in a future release.

    librdkafka:

    • offset.store.method=file is deprecated.
    • offset.store.path is deprecated.
    • offset.store.sync.interval.ms is deprecated.
    • produce.offset.report is no longer used. Offsets are always reported.
    • queuing.strategy was an experimental property that is now deprecated.
    • reconnect.backoff.jitter.ms is no longer used, see reconnect.backoff.ms and reconnect.backoff.max.ms.
    • socket.blocking.max.ms is no longer used.
    • topic.metadata.refresh.fast.cnt is no longer used.

    confluent_kafka:

    • default.topic.config is deprecated.
    • `CachedSchemaRegistryClient: url: was str, now conf dict with all application config properties

    Idempotent Producer

    This release adds support for Idempotent Producer, providing exactly-once producing and guaranteed ordering of messages.

    Enabling idempotence is as simple as setting the enable.idempotence configuration property to true.

    There are no required application changes, but it is recommended to add support for the newly introduced fatal errors that will be triggered when the idempotent producer encounters an unrecoverable error that would break the ordering or duplication guarantees.

    See Idempotent Producer in the manual and the Exactly once semantics blog post for more information.

    Sparse connections

    In previous releases librdkafka would maintain open connections to all brokers in the cluster and the bootstrap servers.

    With this release librdkafka now connects to a single bootstrap server to retrieve the full broker list, and then connects to the brokers it needs to communicate with: partition leaders, group coordinators, etc.

    For large scale deployments this greatly reduces the number of connections between clients and brokers, and avoids the repeated idle connection closes for unused connections.

    Sparse connections is on by default (recommended setting), the old behavior of connecting to all brokers in the cluster can be re-enabled by setting enable.sparse.connections=false.

    See Sparse connections in the manual for more information.

    Original issue librdkafka #825.

    KIP-62 - max.poll.interval.ms is enforced

    This release adds support for max.poll.interval.ms (KIP-62), which requires the application to call consumer.poll() at least every max.poll.interval.ms. Failure to do so will make the consumer automatically leave the group, causing a group rebalance, and not rejoin the group until the application has called ..poll() again, triggering yet another group rebalance. max.poll.interval.ms is set to 5 minutes by default.

    Enhancements

    • OpenSSL version bumped to 1.0.2r
    • AvroProducer now supports encoding with fastavro (#492)
    • Simplify CachedSchemaRegistryClient configuration with configuration dict for application configs
    • Add Delete Schema support to CachedSchemaRegistryClient
    • CachedSchemaRegistryClient now supports HTTP Basic Auth (#440)
    • MessageSerializer now supports specifying reader schema (#470)

    Fixes

    • Fix crash when calling Consumer.consume without setting group.id(now required)
    • CachedSchemaRegistryClient handles get_compatibility properly

    Build/installation/tooling

    • Integration tests moved to docker-compose to aid in cluster set-up/tear-down
    • Runner script ./tests/run.sh added to simplify unit and integration test execution
    Source code(tar.gz)
    Source code(zip)
  • v0.11.6(Oct 24, 2018)

    See librdkafka v0.11.6 release notes for enhancements and fixes in librdkafka.

    New Features

    • Confluent Monitoring Interceptors are now included with Linux and OSX binary wheel distributions. (#464)
    • Experimental binary wheel distributions for Windows environments. (#451)

    Enhancements

    • OpenSSL version bump to 1.0.2p. (#437)
    • Topic configurations have been moved into the global configuration dictionary to simplify configuration. The property default.topic.configuration has been deprecated and will be removed in 1.0, but still has precedence to topic configuration specified in the global configuration dictionary. (#446)

    Fixes

    • Handle debug configuration property prior to plugin.library.paths for enhanced debugging. (#464)
    • Fix memory leak in message headers. (#458)
    • Safely release handler resources. (#434, @coldeasy)
    Source code(tar.gz)
    Source code(zip)
  • v0.11.5(Jul 20, 2018)

    Admin Client support

    v0.11.5 is a feature release that adds support for the Kafka Admin API (KIP-4).

    Admin API

    This release adds support for the Admin API, enabling applications and users to perform administrative Kafka tasks programmatically:

    • Create topics - specifying partition count, replication factor and topic configuration.
    • Delete topics - delete topics in cluster.
    • Create partitions - extend a topic with additional partitions.
    • Alter configuration - set, modify or delete configuration for any Kafka resource (topic, broker, ..).
    • Describe configuration - view configuration for any Kafka resource.

    The API closely follows the Java Admin API:

    def example_create_topics(a, topics):
        new_topics = [NewTopic(topic, num_partitions=3, replication_factor=1) for topic in topics]
        # Call create_topics to asynchronously create topics
        fs = a.create_topics(new_topics)
    
        # Wait for operation to finish.
        for topic, f in fs.items():
            try:
                f.result()  # The result itself is None
                print("Topic {} created".format(topic))
            except Exception as e:
                print("Failed to create topic {}: {}".format(topic, e))
    

    Additional examples can be found in examples/adminapi

    Enhancements

    • Schema Registry HTTPS support with TLS client auth added (#90)
    • Metadata API list_topics() added (#161, @tbsaunde, @stephan-hof)
    • Expose librdkafka built-in partitioner options directly (#396)
    • Callback based throttle event handling; throttle_cb (#237) (#377)
    • Added Unicode support for header values (#382)
    • OpenSSL version bump to 1.0.2o (#410)
    • Avro documentation added to the docs (#382)
    • Python 3.7 support (#382)
    • Allow passing headers as both list(tuples) and dict() (#355)
    • Support for legacy setuptool's install_requires (#399)

    Fixes

    • Release GIL before making blocking calls (#412)
    • Prevent application config dict mutation (#412)
    • Intercept plugin configurations to ensure proper ordering (#404)
    • test_compatibility() should return False not None would return None when unable to check compatibility (#372, @Enether)
    • Schema Registry client returns false when unable to check compatibility(#372, @Enether)
    • Fix invocation of SchemaParseException (#376)
    • Fix call ordering to avoid callback crash on implicit close (#265)
    • Fix memory leaks in generic client setters (#382)
    • Fix AvroProducer/AvroConsumer key/value identity check (#342)
    • Correct Producer.produce documentation to use correct time unit of seconds (#384) (#385)
    • Fix KafkaError refcounting which could lead to memory leaks (#382)
    Source code(tar.gz)
    Source code(zip)
  • v0.11.4(Apr 2, 2018)

    Simplified installation

    This release adds binary wheels containing all required dependencies (librdkafka, openssl, zlib, etc) for Linux and OSX.

    Should these wheels not work on your platform then please file an issue outlining what is failing, and then use the previous method of installing librdkafka manually followed by pip install --no-binary all confluent-kafka

    Message header support

    Support for Kafka message headers has been added (requires broker version >= v0.11.0).

    When producing messages simply provide a list of key,value tuples as headers=:

        myproducer.produce(topic, 'A message payload', headers=[('hdr1', 'val1'), ('another', 'one'), ('hdr1', 'duplicates are supported and ordering is retained')])
    

    Message headers are returned as a list of tuples for consumed messages:

       msg = myconsumer.poll(1)
       if msg is not None and not msg.error():
           headers = msg.headers()
           if headers is not None:
               # convert to dict, collapsing duplicate header keys
               headers_dict = dict(headers)
    

    Enhancements

    • Message header support (@johnistan)
    • Added Consumer.seek()
    • Added consumer.pause/resume support (closes #120, @dangra)
    • Added Consumer.store_offsets() API (#245, @ctrochalakis)
    • Support for passing librdkafka logs to the standard logging module (see logger kwarg in constructors) (#148)
    • Enable produce.offset.report by default (#266) (#267)
    • Expose offsets_for_times consumer method. closes #224 (#268, @johnistan)
    • Add batch consume() API (closes #252, #282, @tburmeister)
    • Add hash func for UnionSchema (#228, @fyndiq)
    • Use schemaless reader to handle complex schema (#251, @fpietka)

    Fixes

    • Fix librdkafka install command for macOS (#281, @vkroz)
    • Constructors now support both dict and kwargs
    • Add __version__ to __init__.py (@mrocklin)
    • Messages could be leaked&lost if exception raised from callback triggered by poll()
    • Make Consumer.commit(..,asynchronous=False) return offset commit results
    • Raise runtime error if accessing consumer after consumer close (#262, @johnistan)
    • Pass py.test arguments from tox (@ctrochalakis)
    • Rename async kwargs to asynchronous (async will continue working until the 1.0 API bump)
    Source code(tar.gz)
    Source code(zip)
  • v0.11.0(Jul 25, 2017)

    This is a minimal librdkafka version-synchronized release of the Python client.

    Changes:

    • Handle null/None values during deserialization
    • Allow to pass custom schema registry instance.
    • None conf values are now converted to NULL rather than the string "None" (#133)
    • Fix memory leaks when certain exceptions were raised.
    • Handle delivery.report.only.error in Python (#84)
    • Proper use of Message error string on Producer (#129)
    • Now Flake8 clean
    Source code(tar.gz)
    Source code(zip)
  • v0.9.1.2(Jul 8, 2016)

Owner
Confluent Inc.
Real-time streams powered by Apache Kafka®
Confluent Inc.
MySQL Operator for Kubernetes

MySQL Operator for Kubernetes The MYSQL Operator for Kubernetes is an Operator for Kubernetes managing MySQL InnoDB Cluster setups inside a Kubernetes

MySQL 462 Dec 24, 2022
Makes it easier to write raw SQL in Python.

CoolSQL Makes it easier to write raw SQL in Python. Usage Quick Start from coolsql import Field name = Field("name") age = Field("age") condition =

Aber 7 Aug 21, 2022
Redis Python Client - The Python interface to the Redis key-value store.

redis-py The Python interface to the Redis key-value store. Installation | Contributing | Getting Started | Connecting To Redis Installation redis-py

Redis 11k Jan 08, 2023
Pandas on AWS - Easy integration with Athena, Glue, Redshift, Timestream, QuickSight, Chime, CloudWatchLogs, DynamoDB, EMR, SecretManager, PostgreSQL, MySQL, SQLServer and S3 (Parquet, CSV, JSON and EXCEL).

AWS Data Wrangler Pandas on AWS Easy integration with Athena, Glue, Redshift, Timestream, QuickSight, Chime, CloudWatchLogs, DynamoDB, EMR, SecretMana

Amazon Web Services - Labs 3.3k Dec 31, 2022
Micro ODM for MongoDB

Beanie - is an asynchronous ODM for MongoDB, based on Motor and Pydantic. It uses an abstraction over Pydantic models and Motor collections to work wi

Roman 993 Jan 03, 2023
A library for python made by me,to make the use of MySQL easier and more pythonic

my_ezql A library for python made by me,to make the use of MySQL easier and more pythonic This library was made by Tony Hasson , a 25 year old student

3 Nov 19, 2021
Asynchronous Python client for InfluxDB

aioinflux Asynchronous Python client for InfluxDB. Built on top of aiohttp and asyncio. Aioinflux is an alternative to the official InfluxDB Python cl

Gustavo Bezerra 159 Dec 27, 2022
An extension package of 🤗 Datasets that provides support for executing arbitrary SQL queries on HF datasets

datasets_sql A 🤗 Datasets extension package that provides support for executing arbitrary SQL queries on HF datasets. It uses DuckDB as a SQL engine

Mario Šaško 19 Dec 15, 2022
Creating a python package to convert /transfer excelsheet data to a mysql Database Table

Creating a python package to convert /transfer excelsheet data to a mysql Database Table

Odiwuor Lameck 1 Jan 07, 2022
google-cloud-bigtable Apache-2google-cloud-bigtable (🥈31 · ⭐ 3.5K) - Google Cloud Bigtable API client library. Apache-2

Python Client for Google Cloud Bigtable Google Cloud Bigtable is Google's NoSQL Big Data database service. It's the same database that powers many cor

Google APIs 39 Dec 03, 2022
SAP HANA Connector in pure Python

SAP HANA Database Client for Python A pure Python client for the SAP HANA Database based on the SAP HANA Database SQL Command Network Protocol. pyhdb

SAP 299 Nov 20, 2022
db.py is an easier way to interact with your databases

db.py What is it Databases Supported Features Quickstart - Installation - Demo How To Contributing TODO What is it? db.py is an easier way to interact

yhat 1.2k Jan 03, 2023
Py2neo is a comprehensive toolkit for working with Neo4j from within Python applications or from the command line.

Py2neo Py2neo is a client library and toolkit for working with Neo4j from within Python applications and from the command line. The library supports b

Nigel Small 1.2k Jan 02, 2023
SQL for Humans™

Records: SQL for Humans™ Records is a very simple, but powerful, library for making raw SQL queries to most relational databases. Just write SQL. No b

Ken Reitz 6.9k Jan 03, 2023
Python Wrapper For sqlite3 and aiosqlite

Python Wrapper For sqlite3 and aiosqlite

6 May 30, 2022
Making it easy to query APIs via SQL

Shillelagh Shillelagh (ʃɪˈleɪlɪ) is an implementation of the Python DB API 2.0 based on SQLite (using the APSW library): from shillelagh.backends.apsw

Beto Dealmeida 207 Dec 30, 2022
Dinamopy is a python helper library for dynamodb

Dinamopy is a python helper library for dynamodb. You can define your access patterns in a json file and can use dynamic method names to make operations.

Rasim Andıran 2 Jul 18, 2022
PostgreSQL database adapter for the Python programming language

psycopg2 - Python-PostgreSQL Database Adapter Psycopg is the most popular PostgreSQL database adapter for the Python programming language. Its main fe

The Psycopg Team 2.8k Jan 05, 2023
Kafka Connect JDBC Docker Image.

kafka-connect-jdbc This is a dockerized version of the Confluent JDBC database connector. Usage This image is running the connect-standalone command w

Marc Horlacher 1 Jan 05, 2022
Amazon S3 Transfer Manager for Python

s3transfer - An Amazon S3 Transfer Manager for Python S3transfer is a Python library for managing Amazon S3 transfers. Note This project is not curren

the boto project 158 Jan 07, 2023