No Brokers available when trying to connect to Kafka through Cloudera Data Science Workbench -


i trying implement github project (https://github.com/tomatotomahto/cdh-sensor-analytics) on our internal hadoop cluster via cloudera data science workbench.

on running project on cloudera data science workbench, error "no brokers available" when trying connect kafka through python api kafkaproducer(bootstrap_servers='broker1:9092') [code can found in https://github.com/tomatotomahto/cdh-sensor-analytics/blob/master/datagenerator/kafkaconnection.py].

i have authenticated using kerberos. have tried giving broker node without port number, , list. but, nothing has worked far.

below stack trace.

nobrokersavailable: nobrokersavailable nobrokersavailable                        traceback (most recent call  last) in engine ----> 1 dgen = datagenerator(config)  /home/cdsw/datagenerator/datagenerator.py in __init__(self, config)  39           40         self._kudu = kuduconnection(self._config['kudu_master'],  self._config['kudu_port'], spark) ---> 41         self._kafka =  kafkaconnection(self._config['kafka_brokers'],  self._config['kafka_topic'])  42   43         #self._kafka  /home/cdsw/datagenerator/kafkaconnection.py in __init__(self, brokers,  topic)   4 class kafkaconnection():   5   def __init__(self, brokers, topic): ----> 6     self._kafka_producer =  kafkaproducer(bootstrap_servers=brokers)   7     self._topic = topic   8       /home/cdsw/.local/lib/python3.6/site-packages/kafka/producer/kafka.py  in __init__(self, **configs) 333  334         client = kafkaclient(metrics=self._metrics,  metric_group_prefix='producer', --> 335                              **self.config) 336  337         # auto-discovered version client if necessary  /home/cdsw/.local/lib/python3.6/site-packages/kafka/client_async.py in  __init__(self, **configs) 208         if self.config['api_version'] none: 209             check_timeout =  self.config['api_version_auto_timeout_ms'] / 1000 --> 210             self.config['api_version'] =  self.check_version(timeout=check_timeout) 211  212     def _bootstrap(self, hosts):  /home/cdsw/.local/lib/python3.6/site-packages/kafka/client_async.py in  check_version(self, node_id, timeout, strict) 806             try_node = node_id or self.least_loaded_node() 807             if try_node none: --> 808                 raise errors.nobrokersavailable() 809             self._maybe_connect(try_node) 810             conn = self._conns[try_node]  nobrokersavailable: nobrokersavailable 

i tried connecting outside of workbench through cli having vpn connection. got same error.

any pointers on missing? in advance!


Comments

Popular posts from this blog

android - InAppBilling registering BroadcastReceiver in AndroidManifest -

python Tkinter Capturing keyboard events save as one single string -

sql server - Why does Linq-to-SQL add unnecessary COUNT()? -