java - My Kafka sink connector for Neo4j fails to load -
introduction:
let me start apologizing vagueness in question try provide information on topic can (hopefully not much), , please let me know if should provide more. well, quite new kafka , stumble on terminology.
so, understanding on how sink , source work, can use filestreamsourceconnector provided kafka quickstart guide write data(neo4j commands) topic held in kafka cluster. can write own neo4j sink connector , task read commands , send them 1 or more neo4j servers. keep project simple possible, now, based sink connector , task off of kafka quickstart guide's filestreamsinkconnector , filestreamsinktask.
kafka's filestream:
my neo4j sink connector:
package neo4k.sink; import org.apache.kafka.common.config.configdef; import org.apache.kafka.common.config.configdef.importance; import org.apache.kafka.common.config.configdef.type; import org.apache.kafka.common.utils.appinfoparser; import org.apache.kafka.connect.connector.task; import org.apache.kafka.connect.sink.sinkconnector; import java.util.arraylist; import java.util.hashmap; import java.util.list; import java.util.map; public class neo4jsinkconnector extends sinkconnector { public enum keys { ; static final string uri = "uri"; static final string user = "user"; static final string pass = "pass"; static final string log = "log"; } private static final configdef config_def = new configdef() .define(keys.uri, type.string, "", importance.high, "neo4j uri") .define(keys.user, type.string, "", importance.medium, "user auth") .define(keys.pass, type.string, "", importance.medium, "pass auth") .define(keys.log, type.string, "./neoj4sinkconnecterlog.txt", importance.low, "log file"); private string uri; private string user; private string pass; private string logfile; @override public string version() { return appinfoparser.getversion(); } @override public void start(map<string, string> props) { uri = props.get(keys.uri); user = props.get(keys.user); pass = props.get(keys.pass); logfile = props.get(keys.log); } @override public class<? extends task> taskclass() { return neo4jsinktask.class; } @override public list<map<string, string>> taskconfigs(int maxtasks) { arraylist<map<string, string>> configs = new arraylist<>(); (int = 0; < maxtasks; i++) { map<string, string> config = new hashmap<>(); if (uri != null) config.put(keys.uri, uri); if (user != null) config.put(keys.user, user); if (pass != null) config.put(keys.pass, pass); if (logfile != null) config.put(keys.log, logfile); configs.add(config); } return configs; } @override public void stop() { } @override public configdef config() { return config_def; } }
my neo4j sink task:
package neo4k.sink; import org.apache.kafka.clients.consumer.offsetandmetadata; import org.apache.kafka.common.topicpartition; import org.apache.kafka.connect.sink.sinkrecord; import org.apache.kafka.connect.sink.sinktask; import org.neo4j.driver.v1.authtokens; import org.neo4j.driver.v1.driver; import org.neo4j.driver.v1.graphdatabase; import org.neo4j.driver.v1.session; import org.neo4j.driver.v1.statementresult; import org.neo4j.driver.v1.exceptions.neo4jexception; import org.slf4j.logger; import org.slf4j.loggerfactory; import java.util.collection; import java.util.map; public class neo4jsinktask extends sinktask { private static final logger log = loggerfactory.getlogger(neo4jsinktask.class); private string uri; private string user; private string pass; private string logfile; private driver driver; private session session; public neo4jsinktask() { } @override public string version() { return new neo4jsinkconnector().version(); } @override public void start(map<string, string> props) { uri = props.get(neo4jsinkconnector.keys.uri); user = props.get(neo4jsinkconnector.keys.user); pass = props.get(neo4jsinkconnector.keys.pass); logfile = props.get(neo4jsinkconnector.keys.log); driver = null; session = null; try { driver = graphdatabase.driver(uri, authtokens.basic(user, pass)); session = driver.session(); } catch (neo4jexception ex) { log.trace(ex.getmessage(), logfilename()); } } @override public void put(collection<sinkrecord> sinkrecords) { statementresult result; (sinkrecord record : sinkrecords) { result = session.run(record.value().tostring()); log.trace(result.tostring(), logfilename()); } } @override public void flush(map<topicpartition, offsetandmetadata> offsets) { } @override public void stop() { if (session != null) session.close(); if (driver != null) driver.close(); } private string logfilename() { return logfile == null ? "stdout" : logfile; } }
the issue:
after writing that, next built including dependencies had, excluding kafka dependencies, jar (or uber jar? 1 file). edited plugin pathways in connect-standalone.properties include artifact , wrote properties file neo4j sink connector. did in attempt follow these guidelines.
my neo4j sink connector properties file:
name=neo4k-sink connector.class=neo4k.sink.neo4jsinkconnector tasks.max=1 uri=bolt://localhost:7687 user=neo4j pass=hunter2 topics=connect-test
but upon running standalone, error in output shuts down stream (error on line 5):
[2017-08-14 12:59:00,150] info kafka version : 0.11.0.0 (org.apache.kafka.common.utils.appinfoparser:83) [2017-08-14 12:59:00,150] info kafka commitid : cb8625948210849f (org.apache.kafka.common.utils.appinfoparser:84) [2017-08-14 12:59:00,153] info source task workersourcetask{id=local-file-source-0} finished initialization , start (org.apache.kafka.connect.runtime.workersourcetask:143) [2017-08-14 12:59:00,153] info created connector local-file-source (org.apache.kafka.connect.cli.connectstandalone:91) [2017-08-14 12:59:00,153] error stopping after connector error (org.apache.kafka.connect.cli.connectstandalone:100) java.lang.illegalargumentexception: malformed \uxxxx encoding. @ java.util.properties.loadconvert(properties.java:574) @ java.util.properties.load0(properties.java:390) @ java.util.properties.load(properties.java:341) @ org.apache.kafka.common.utils.utils.loadprops(utils.java:429) @ org.apache.kafka.connect.cli.connectstandalone.main(connectstandalone.java:84) [2017-08-14 12:59:00,156] info kafka connect stopping (org.apache.kafka.connect.runtime.connect:65) [2017-08-14 12:59:00,156] info stopping rest server (org.apache.kafka.connect.runtime.rest.restserver:154) [2017-08-14 12:59:00,168] info stopped serverconnector@540accf4{http/1.1}{0.0.0.0:8083} (org.eclipse.jetty.server.serverconnector:306) [2017-08-14 12:59:00,173] info stopped o.e.j.s.servletcontexthandler@6d548d27{/,null,unavailable} (org.eclipse.jetty.server.handler.contexthandler:865)
edit: should mention during part of connector loading output declaring plugins have been added, not see mention of jar built earlier , created pathway in connect-standalone.properties. here's snippet context:
[2017-08-14 12:58:58,969] info added plugin 'org.apache.kafka.connect.file.filestreamsinkconnector' (org.apache.kafka.connect.runtime.isolation.delegatingclassloader:132) [2017-08-14 12:58:58,969] info added plugin 'org.apache.kafka.connect.tools.mocksourceconnector' (org.apache.kafka.connect.runtime.isolation.delegatingclassloader:132) [2017-08-14 12:58:58,969] info added plugin 'org.apache.kafka.connect.tools.verifiablesourceconnector' (org.apache.kafka.connect.runtime.isolation.delegatingclassloader:132) [2017-08-14 12:58:58,969] info added plugin 'org.apache.kafka.connect.tools.verifiablesinkconnector' (org.apache.kafka.connect.runtime.isolation.delegatingclassloader:132) [2017-08-14 12:58:58,970] info added plugin 'org.apache.kafka.connect.tools.mockconnector' (org.apache.kafka.connect.runtime.isolation.delegatingclassloader:132)
conclusion:
i @ loss, i've done testing , researching couple hours , don't think i'm sure question ask. i'll thank reading if you've gotten far. if noticed glaring may have done wrong in code or in method (e.g. packaging jar), or think should provide more context or console logs or let me know. thank you, again.
as pointed out @randall hauch, properties file had hidden characters within because rich text document. fixed duplicating connect-file-sink.properties file provided kafka, believe regular text document. renaming , editing duplicate neo4j sink properties.
Comments
Post a Comment