No Brokers available when trying to connect to Kafka through Cloudera Data Science Workbench -


i trying implement github project (https://github.com/tomatotomahto/cdh-sensor-analytics) on our internal hadoop cluster via cloudera data science workbench.

on running project on cloudera data science workbench, error "no brokers available" when trying connect kafka through python api kafkaproducer(bootstrap_servers='broker1:9092') [code can found in https://github.com/tomatotomahto/cdh-sensor-analytics/blob/master/datagenerator/kafkaconnection.py].

i have authenticated using kerberos. have tried giving broker node without port number, , list. but, nothing has worked far.

below stack trace.

nobrokersavailable: nobrokersavailable nobrokersavailable                        traceback (most recent call  last) in engine ----> 1 dgen = datagenerator(config)  /home/cdsw/datagenerator/datagenerator.py in __init__(self, config)  39           40         self._kudu = kuduconnection(self._config['kudu_master'],  self._config['kudu_port'], spark) ---> 41         self._kafka =  kafkaconnection(self._config['kafka_brokers'],  self._config['kafka_topic'])  42   43         #self._kafka  /home/cdsw/datagenerator/kafkaconnection.py in __init__(self, brokers,  topic)   4 class kafkaconnection():   5   def __init__(self, brokers, topic): ----> 6     self._kafka_producer =  kafkaproducer(bootstrap_servers=brokers)   7     self._topic = topic   8       /home/cdsw/.local/lib/python3.6/site-packages/kafka/producer/kafka.py  in __init__(self, **configs) 333  334         client = kafkaclient(metrics=self._metrics,  metric_group_prefix='producer', --> 335                              **self.config) 336  337         # auto-discovered version client if necessary  /home/cdsw/.local/lib/python3.6/site-packages/kafka/client_async.py in  __init__(self, **configs) 208         if self.config['api_version'] none: 209             check_timeout =  self.config['api_version_auto_timeout_ms'] / 1000 --> 210             self.config['api_version'] =  self.check_version(timeout=check_timeout) 211  212     def _bootstrap(self, hosts):  /home/cdsw/.local/lib/python3.6/site-packages/kafka/client_async.py in  check_version(self, node_id, timeout, strict) 806             try_node = node_id or self.least_loaded_node() 807             if try_node none: --> 808                 raise errors.nobrokersavailable() 809             self._maybe_connect(try_node) 810             conn = self._conns[try_node]  nobrokersavailable: nobrokersavailable 

i tried connecting outside of workbench through cli having vpn connection. got same error.

any pointers on missing? in advance!


Comments

Popular posts from this blog

python Tkinter Capturing keyboard events save as one single string -

android - InAppBilling registering BroadcastReceiver in AndroidManifest -

javascript - Z-index in d3.js -