Error with .net Integration test

Hi all,

I am trying to create an integration test which would verify that sending and receiving messages are working as expected. I am doing the setup on my windows machine.

I have setup Kafka locally and I have started zookeeper, Kafka and schema registry as shown below. I also have used the consumer and producer scripts to verify that I can send and receive messages on the topic.

However, when I am trying to run my test on visual studio I get the following error

Confluent.Kafka.ProduceException`2[[System.String, System.Private.CoreLib, Version=6.0.0.0, Culture=neutral, PublicKeyToken=7cec85d7bea7798e],[System.String, System.Private.CoreLib, Version=6.0.0.0, Culture=neutral, PublicKeyToken=7cec85d7bea7798e]] : Local: Key serialization error
---- System.Net.Http.HttpRequestException : [http://localhost:8082/] HttpRequestException: No connection could be made because the target machine actively refused it. (127.0.0.1:3128)

My schema registry properties are as shown below

and my test config are below

image

Appreciate the help

Thanks
Kris

Port 3128 smells like a proxy server - might check that there isn’t some environment setting that’s attempting to force connections through a proxy server.

I had a look at the schema registry start, and there is no reference to 2138

[2022-09-08 15:37:58,608] INFO SchemaRegistryConfig values:
        access.control.allow.headers =
        access.control.allow.methods =
        access.control.allow.origin =
        access.control.skip.options = true
        authentication.method = NONE
        authentication.realm =
        authentication.roles = [*]
        authentication.skip.paths = []
        avro.compatibility.level =
        compression.enable = true
        csrf.prevention.enable = false
        csrf.prevention.token.endpoint = /csrf
        csrf.prevention.token.expiration.minutes = 30
        csrf.prevention.token.max.entries = 10000
        debug = false
        dos.filter.delay.ms = 100
        dos.filter.enabled = false
        dos.filter.insert.headers = true
        dos.filter.ip.whitelist = []
        dos.filter.managed.attr = false
        dos.filter.max.idle.tracker.ms = 30000
        dos.filter.max.requests.ms = 30000
        dos.filter.max.requests.per.sec = 25
        dos.filter.max.wait.ms = 50
        dos.filter.remote.port = false
        dos.filter.throttle.ms = 30000
        dos.filter.throttled.requests = 5
        dos.filter.track.global = false
        host.name = 127.0.0.1
        http2.enabled = true
        idle.timeout.ms = 30000
        inter.instance.headers.whitelist = []
        inter.instance.protocol = http
        kafkastore.bootstrap.servers = [PLAINTEXT://localhost:9092]
        kafkastore.checkpoint.dir = /tmp
        kafkastore.checkpoint.version = 0
        kafkastore.connection.url =
        kafkastore.group.id =
        kafkastore.init.timeout.ms = 60000
        kafkastore.sasl.kerberos.kinit.cmd = /usr/bin/kinit
        kafkastore.sasl.kerberos.min.time.before.relogin = 60000
        kafkastore.sasl.kerberos.service.name =
        kafkastore.sasl.kerberos.ticket.renew.jitter = 0.05
        kafkastore.sasl.kerberos.ticket.renew.window.factor = 0.8
        kafkastore.sasl.mechanism = GSSAPI
        kafkastore.security.protocol = PLAINTEXT
        kafkastore.ssl.cipher.suites =
        kafkastore.ssl.enabled.protocols = TLSv1.2,TLSv1.1,TLSv1
        kafkastore.ssl.endpoint.identification.algorithm =
        kafkastore.ssl.key.password = [hidden]
        kafkastore.ssl.keymanager.algorithm = SunX509
        kafkastore.ssl.keystore.location =
        kafkastore.ssl.keystore.password = [hidden]
        kafkastore.ssl.keystore.type = JKS
        kafkastore.ssl.protocol = TLS
        kafkastore.ssl.provider =
        kafkastore.ssl.trustmanager.algorithm = PKIX
        kafkastore.ssl.truststore.location =
        kafkastore.ssl.truststore.password = [hidden]
        kafkastore.ssl.truststore.type = JKS
        kafkastore.timeout.ms = 500
        kafkastore.topic = _schemas
        kafkastore.topic.replication.factor = 3
        kafkastore.topic.skip.validation = false
        kafkastore.update.handlers = []
        kafkastore.write.max.retries = 5
        leader.eligibility = true
        listeners = [http://0.0.0.0:8082]
        master.eligibility = null
        metric.reporters = []
        metrics.jmx.prefix = kafka.schema.registry
        metrics.num.samples = 2
        metrics.sample.window.ms = 30000
        metrics.tag.map = []
        mode.mutability = true
        port = 8082
        request.logger.name = io.confluent.rest-utils.requests
        request.queue.capacity = 2147483647
        request.queue.capacity.growby = 64
        request.queue.capacity.init = 128
        resource.extension.class = []
        resource.extension.classes = []
        resource.static.locations = []
        response.http.headers.config =
        response.mediatype.default = application/vnd.schemaregistry.v1+json
        response.mediatype.preferred = [application/vnd.schemaregistry.v1+json, application/vnd.schemaregistry+json, application/json]
        rest.servlet.initializor.classes = []
        schema.cache.expiry.secs = 300
        schema.cache.size = 1000
        schema.canonicalize.on.consume = []
        schema.compatibility.level = backward
        schema.providers = []
        schema.registry.group.id = schema-registry
        schema.registry.inter.instance.protocol =
        schema.registry.resource.extension.class = []
        shutdown.graceful.ms = 1000
        ssl.cipher.suites = []
        ssl.client.auth = false
        ssl.client.authentication = NONE
        ssl.enabled.protocols = []
        ssl.endpoint.identification.algorithm = null
        ssl.key.password = [hidden]
        ssl.keymanager.algorithm =
        ssl.keystore.location =
        ssl.keystore.password = [hidden]
        ssl.keystore.reload = false
        ssl.keystore.type = JKS
        ssl.keystore.watch.location =
        ssl.protocol = TLS
        ssl.provider =
        ssl.trustmanager.algorithm =
        ssl.truststore.location =
        ssl.truststore.password = [hidden]
        ssl.truststore.type = JKS
        thread.pool.max = 200
        thread.pool.min = 8
        websocket.path.prefix = /ws
        websocket.servlet.initializor.classes = []
 (io.confluent.kafka.schemaregistry.rest.SchemaRegistryConfig:376)

In terms of the test itself, I could not see any reference to the use of a proxy

   public KafkaIntegrationTest()
    {
        var configuration = new ConfigurationBuilder()
                    .SetBasePath(Directory.GetCurrentDirectory())
                    .AddJsonFile("appsettings.json", false)
                    .Build();

        _config = Options.Create(configuration.GetSection("KafkaProducer").Get<KafkaProducerOptions>());
    }

    [Fact]
    public async Task ProduceUnderwritingEventTestAsync()
    {
        
        var value = "testmessage";
        IDictionary<string, string>? headers = new Dictionary<string, string>();
        headers.Add("", "");

        await new KafkaProducer(_config).Produce<String, String>("12333", value, headers);

        //Assert.Equal(value, message.ToString());
       
    }

Would you know where this could be set at?

The proxy could easily be set in something like the environment. For example, running the Avro Schema Registry example from confluent-kafka-dotnet/examples/AvroGeneric at master · confluentinc/confluent-kafka-dotnet · GitHub , but setting env for HTTP_PROXY:

HTTP_PROXY=http://localhost:3128 bin/Debug/net6.0/AvroGeneric localhost:9092 http://localhost:8081 foo

… I get a similar error:

foo
error producing message: Confluent.Kafka.ProduceException`2[System.String,Avro.Generic.GenericRecord]: Local: Value serialization error
 ---> System.Net.Http.HttpRequestException: [http://localhost:8081/] HttpRequestException: Connection refused (localhost:3128)
   at Confluent.SchemaRegistry.RestService.ExecuteOnOneInstanceAsync(Func`1 createRequest) in /Users/brettrandall/src/confluentinc/confluent-kafka-dotnet/src/Confluent.SchemaRegistry/Rest/RestService.cs:line 248
   at Confluent.SchemaRegistry.RestService.RequestAsync[T](String endPoint, HttpMethod method, Object[] jsonBody) in /Users/brettrandall/src/confluentinc/confluent-kafka-dotnet/src/Confluent.SchemaRegistry/Rest/RestService.cs:line 257
   at Confluent.SchemaRegistry.RestService.RegisterSchemaAsync(String subject, Schema schema, Boolean normalize) in /Users/brettrandall/src/confluentinc/confluent-kafka-dotnet/src/Confluent.SchemaRegistry/Rest/RestService.cs:line 321
   at Confluent.SchemaRegistry.CachedSchemaRegistryClient.RegisterSchemaAsync(String subject, Schema schema, Boolean normalize) in /Users/brettrandall/src/confluentinc/confluent-kafka-dotnet/src/Confluent.SchemaRegistry/CachedSchemaRegistryClient.cs:line 377
   at Confluent.SchemaRegistry.Serdes.GenericSerializerImpl.Serialize(String topic, GenericRecord data, Boolean isKey) in /Users/brettrandall/src/confluentinc/confluent-kafka-dotnet/src/Confluent.SchemaRegistry.Serdes.Avro/GenericSerializerImpl.cs:line 151
   at Confluent.SchemaRegistry.Serdes.AvroSerializer`1.SerializeAsync(T value, SerializationContext context) in /Users/brettrandall/src/confluentinc/confluent-kafka-dotnet/src/Confluent.SchemaRegistry.Serdes.Avro/AvroSerializer.cs:line 163
   at Confluent.Kafka.Producer`2.ProduceAsync(TopicPartition topicPartition, Message`2 message, CancellationToken cancellationToken) in /Users/brettrandall/src/confluentinc/confluent-kafka-dotnet/src/Confluent.Kafka/Producer.cs:line 771
   --- End of inner exception stack trace ---
   at Confluent.Kafka.Producer`2.ProduceAsync(TopicPartition topicPartition, Message`2 message, CancellationToken cancellationToken) in /Users/brettrandall/src/confluentinc/confluent-kafka-dotnet/src/Confluent.Kafka/Producer.cs:line 777
   at Confluent.Kafka.Examples.AvroGeneric.Program.Main(String[] args) in /Users/brettrandall/src/confluentinc/confluent-kafka-dotnet/examples/AvroGeneric/Program.cs:line 112

Make sure neither HTTP_PROXY or HTTPS_PROXY are set in your test environment.