No Brokers available when trying to connect to Kafka through Cloudera Data Science Workbench -
i trying implement github project (https://github.com/tomatotomahto/cdh-sensor-analytics) on our internal hadoop cluster via cloudera data science workbench.
on running project on cloudera data science workbench, error "no brokers available" when trying connect kafka through python api kafkaproducer(bootstrap_servers='broker1:9092') [code can found in https://github.com/tomatotomahto/cdh-sensor-analytics/blob/master/datagenerator/kafkaconnection.py].
i have authenticated using kerberos. have tried giving broker node without port number, , list. but, nothing has worked far.
below stack trace.
nobrokersavailable: nobrokersavailable nobrokersavailable traceback (most recent call last) in engine ----> 1 dgen = datagenerator(config) /home/cdsw/datagenerator/datagenerator.py in __init__(self, config) 39 40 self._kudu = kuduconnection(self._config['kudu_master'], self._config['kudu_port'], spark) ---> 41 self._kafka = kafkaconnection(self._config['kafka_brokers'], self._config['kafka_topic']) 42 43 #self._kafka /home/cdsw/datagenerator/kafkaconnection.py in __init__(self, brokers, topic) 4 class kafkaconnection(): 5 def __init__(self, brokers, topic): ----> 6 self._kafka_producer = kafkaproducer(bootstrap_servers=brokers) 7 self._topic = topic 8 /home/cdsw/.local/lib/python3.6/site-packages/kafka/producer/kafka.py in __init__(self, **configs) 333 334 client = kafkaclient(metrics=self._metrics, metric_group_prefix='producer', --> 335 **self.config) 336 337 # auto-discovered version client if necessary /home/cdsw/.local/lib/python3.6/site-packages/kafka/client_async.py in __init__(self, **configs) 208 if self.config['api_version'] none: 209 check_timeout = self.config['api_version_auto_timeout_ms'] / 1000 --> 210 self.config['api_version'] = self.check_version(timeout=check_timeout) 211 212 def _bootstrap(self, hosts): /home/cdsw/.local/lib/python3.6/site-packages/kafka/client_async.py in check_version(self, node_id, timeout, strict) 806 try_node = node_id or self.least_loaded_node() 807 if try_node none: --> 808 raise errors.nobrokersavailable() 809 self._maybe_connect(try_node) 810 conn = self._conns[try_node] nobrokersavailable: nobrokersavailable
i tried connecting outside of workbench through cli having vpn connection. got same error.
any pointers on missing? in advance!
Comments
Post a Comment