Displays chain of errors coming from kafka client.#200
Merged
mashhurs merged 3 commits intologstash-plugins:mainfrom Aug 28, 2025
Merged
Displays chain of errors coming from kafka client.#200mashhurs merged 3 commits intologstash-plugins:mainfrom
mashhurs merged 3 commits intologstash-plugins:mainfrom
Conversation
donoghuc
reviewed
Aug 27, 2025
lib/logstash/outputs/kafka.rb
Outdated
| :cause => e.respond_to?(:getCause) ? e.getCause() : nil) | ||
| :cause => cause_error) | ||
| while cause_error != nil | ||
| logger.error("Kafka producer error chain", |
Contributor
There was a problem hiding this comment.
This will duplicate the Error log message right? Should we re-assign cause_error with getCause, or just put the log message in the while loop?
Contributor
There was a problem hiding this comment.
donoghuc
reviewed
Aug 27, 2025
lib/logstash/outputs/kafka.rb
Outdated
| logger.error("Unable to create Kafka producer from given configuration", | ||
| :kafka_error_message => e, | ||
| :cause => e.respond_to?(:getCause) ? e.getCause() : nil) | ||
| :cause => cause_error) |
Contributor
There was a problem hiding this comment.
Suggested change
| :cause => cause_error) | |
| :cause => cause_error) | |
| cause_error = e.respond_to?(:getCause) ? e.getCause() : nil |
Avoid duplicating the top level err code when unwinding the error stack.
logstash | [2025-08-27T22:20:02,962][ERROR][logstash.outputs.kafka ][main] Unable to create Kafka producer from given configuration {:kafka_error_message=>#<Java::OrgApacheKafkaCommon::KafkaException: Failed to construct kafka producer>, :cause=>#<Java::OrgApacheKafkaCommon::KafkaException: Failed to create new NetworkClient>}
logstash | [2025-08-27T22:20:02,962][ERROR][logstash.outputs.kafka ][main] Kafka producer error chain {:kafka_error_message=>#<Java::OrgApacheKafkaCommon::KafkaException: Failed to create new NetworkClient>, :cause=>#<Java::OrgApacheKafkaCommon::KafkaException: Failed to create new NetworkClient>}
Contributor
Author
There was a problem hiding this comment.
Ah right, it duplicates.
WDYT for this commit? It just goes throw each exception where cause will be inner one.
logstash | [2025-08-28T00:02:01,315][ERROR][logstash.outputs.kafka ][main] Kafka producer error chain {:kafka_error_message=>"Java::OrgApacheKafkaCommon::KafkaException: Failed to construct kafka producer"}
logstash | [2025-08-28T00:02:01,316][ERROR][logstash.outputs.kafka ][main] Kafka producer error chain {:kafka_error_message=>"Java::OrgApacheKafkaCommon::KafkaException: Failed to create new NetworkClient"}
logstash | [2025-08-28T00:02:01,316][ERROR][logstash.outputs.kafka ][main] Kafka producer error chain {:kafka_error_message=>"Java::OrgApacheKafkaCommon::KafkaException: org.apache.kafka.common.KafkaException: Failed to load SSL keystore /usr/share/logstash/kerberos/ssl/logstash.keystore.jks of type JKS"}
logstash | [2025-08-28T00:02:01,316][ERROR][logstash.outputs.kafka ][main] Kafka producer error chain {:kafka_error_message=>"Java::OrgApacheKafkaCommon::KafkaException: Failed to load SSL keystore /usr/share/logstash/kerberos/ssl/logstash.keystore.jks of type JKS"}
logstash | [2025-08-28T00:02:01,316][ERROR][logstash.outputs.kafka ][main] Kafka producer error chain {:kafka_error_message=>"Java::JavaIo::IOException: keystore password was incorrect"}
logstash | [2025-08-28T00:02:01,316][ERROR][logstash.outputs.kafka ][main] Kafka producer error chain {:kafka_error_message=>"Java::JavaSecurity::UnrecoverableKeyException: failed to decrypt safe contents entry: javax.crypto.BadPaddingException: Given final block not properly padded. Such issues can arise if a bad key is used during decryption."}
logstash | [2025-08-28T00:02:01,316][ERROR][logstash.javapipeline ][main] Pipeline error {:pipeline_id=>"main", :exception=>#<Java::OrgApacheKafkaCommon::KafkaException: Failed to construct kafka producer>, :backtrace=>["org.apache.kafka.clients.producer.KafkaProducer.<init>(org/apache/kafka/clients/producer/KafkaProducer.java:476)", "org.apache.kafka.clients.producer.KafkaProducer.<init>(org/apache/kafka/clients/producer/KafkaProducer.java:297)", "org.apache.kafka.clients.producer.KafkaProducer.<init>(org/apache/kafka/clients/producer/KafkaProducer.java:324)", "org.apache.kafka.clients.producer.KafkaProducer.<init>(org/apache/kafka/clients/producer/KafkaProducer.java:309)", "jdk.internal.reflect.DirectConstructorHandleAccessor.newInstance(jdk/internal/reflect/DirectConstructorHandleAccessor.java:62)", "java.lang.reflect.Constructor.newInstanceWithCaller(java/lang/reflect/Constructor.java:502)", "java.lang.reflect.Constructor.newInstance(java/lang/reflect/Constructor.java:486)", "org.jruby.javasupport.JavaConstructor.newInstanceDirect(org/jruby/javasupport/JavaConstructor.java:165)", "org.jruby.RubyClass.new(org/jruby/RubyClass.java:922)", "org.jruby.RubyClass$INVOKER$i$newInstance.call(org/jruby/RubyClass$INVOKER$i$newInstance.gen)", "RUBY.create_producer(/usr/share/logstash/vendor/local_gems/22313566/logstash-integration-kafka-11.6.4-universal.arm64e-darwin-24/lib/logstash/outputs/kafka.rb:392)", "RUBY.register(/usr/share/logstash/vendor/local_gems/22313566/logstash-integration-kafka-11.6.4-universal.arm64e-darwin-24/lib/logstash/outputs/kafka.rb:232)", "org.jruby.RubyClass.finvoke(org/jruby/RubyClass.java:598)", "org.jruby.RubyBasicObject.callMethod(org/jruby/RubyBasicObject.java:349)", "org.logstash.config.ir.compiler.OutputStrategyExt$SimpleAbstractOutputStrategyExt.reg(org/logstash/config/ir/compiler/OutputStrategyExt.java:275)", "org.logstash.config.ir.compiler.OutputStrategyExt$AbstractOutputStrategyExt.register(org/logstash/config/ir/compiler/OutputStrategyExt.java:131)", "org.logstash.config.ir.compiler.OutputDelegatorExt.doRegister(org/logstash/config/ir/compiler/OutputDelegatorExt.java:126)", "org.logstash.config.ir.compiler.AbstractOutputDelegatorExt.register(org/logstash/config/ir/compiler/AbstractOutputDelegatorExt.java:69)", "org.logstash.config.ir.compiler.AbstractOutputDelegatorExt$INVOKER$i$0$0$register.call(org/logstash/config/ir/compiler/AbstractOutputDelegatorExt$INVOKER$i$0$0$register.gen)", "RUBY.register_plugins(/usr/share/logstash/logstash-core/lib/logstash/java_pipeline.rb:245)", "org.jruby.RubyArray.each(org/jruby/RubyArray.java:1981)", "org.jruby.RubyArray$INVOKER$i$0$0$each.call(org/jruby/RubyArray$INVOKER$i$0$0$each.gen)", "RUBY.register_plugins(/usr/share/logstash/logstash-core/lib/logstash/java_pipeline.rb:244)", "RUBY.maybe_setup_out_plugins(/usr/share/logstash/logstash-core/lib/logstash/java_pipeline.rb:622)", "RUBY.start_workers(/usr/share/logstash/logstash-core/lib/logstash/java_pipeline.rb:257)", "RUBY.run(/usr/share/logstash/logstash-core/lib/logstash/java_pipeline.rb:198)", "RUBY.start(/usr/share/logstash/logstash-core/lib/logstash/java_pipeline.rb:150)", "org.jruby.RubyProc.call(org/jruby/RubyProc.java:354)", "java.lang.Thread.run(java/lang/Thread.java:1583)"], "pipeline.sources"=>["/usr/share/logstash/config/kafkaout-kerberos.conf"], :thread=>"#<Thread:0x792cc33a /usr/share/logstash/logstash-core/lib/logstash/java_pipeline.rb:138 run>"}
logstash | [2025-08-28T00:02:01,317][INFO ][logstash.javapipeline ][main] Pipeline terminated {"pipeline.id"=>"main"}
logstash | [2025-08-28T00:02:01,328][ERROR][logstash.agent ] Failed to execute action {:id=>:main, :action_type=>LogStash::ConvergeResult::FailedAction, :message=>"Could not execute action: PipelineAction::Create<main>, action_result: false", :backtrace=>nil}
Contributor
There was a problem hiding this comment.
i like it. Simplifies the control flow too. Nice.
donoghuc
approved these changes
Aug 28, 2025
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Description
Current implementation hides the detail errors coming from kafka-client when initializing a producer. Kafka client creates a chain of
KafkaExceptions (e.gKafkaException("message", new KafkaException("NetworkClient error", new KafkaException())) but current following line shows only first error.We need iterate over error chains and show to provide descriptive error messages.
With this PR , we get chain of exception details. For the example when password is incorrect, we get
keystore password was incorrectin the chain errors.