java - My Kafka sink connector for Neo4j fails to load -


introduction:

let me start apologizing vagueness in question try provide information on topic can (hopefully not much), , please let me know if should provide more. well, quite new kafka , stumble on terminology.

so, understanding on how sink , source work, can use filestreamsourceconnector provided kafka quickstart guide write data(neo4j commands) topic held in kafka cluster. can write own neo4j sink connector , task read commands , send them 1 or more neo4j servers. keep project simple possible, now, based sink connector , task off of kafka quickstart guide's filestreamsinkconnector , filestreamsinktask.

kafka's filestream:

filestreamsourceconnector

filestreamsourcetask

filestreamsinkconnector

filestreamsinktask

my neo4j sink connector:

package neo4k.sink;  import org.apache.kafka.common.config.configdef; import org.apache.kafka.common.config.configdef.importance; import org.apache.kafka.common.config.configdef.type; import org.apache.kafka.common.utils.appinfoparser; import org.apache.kafka.connect.connector.task; import org.apache.kafka.connect.sink.sinkconnector;  import java.util.arraylist; import java.util.hashmap; import java.util.list; import java.util.map;  public class neo4jsinkconnector extends sinkconnector {      public enum keys {         ;         static final string uri = "uri";         static final string user = "user";         static final string pass = "pass";         static final string log = "log";     }      private static final configdef config_def = new configdef()             .define(keys.uri, type.string, "", importance.high, "neo4j uri")             .define(keys.user, type.string, "", importance.medium, "user auth")             .define(keys.pass, type.string, "", importance.medium, "pass auth")             .define(keys.log, type.string, "./neoj4sinkconnecterlog.txt", importance.low, "log file");      private string uri;     private string user;     private string pass;     private string logfile;      @override     public string version() {         return appinfoparser.getversion();     }      @override     public void start(map<string, string> props) {         uri = props.get(keys.uri);         user = props.get(keys.user);         pass = props.get(keys.pass);         logfile = props.get(keys.log);     }      @override     public class<? extends task> taskclass() {         return neo4jsinktask.class;     }      @override     public list<map<string, string>> taskconfigs(int maxtasks) {         arraylist<map<string, string>> configs = new arraylist<>();         (int = 0; < maxtasks; i++) {             map<string, string> config = new hashmap<>();             if (uri != null)                 config.put(keys.uri, uri);             if (user != null)                 config.put(keys.user, user);             if (pass != null)                 config.put(keys.pass, pass);             if (logfile != null)                 config.put(keys.log, logfile);             configs.add(config);         }         return configs;     }      @override     public void stop() {     }      @override     public configdef config() {         return config_def;     } } 

my neo4j sink task:

package neo4k.sink;  import org.apache.kafka.clients.consumer.offsetandmetadata; import org.apache.kafka.common.topicpartition; import org.apache.kafka.connect.sink.sinkrecord; import org.apache.kafka.connect.sink.sinktask; import org.neo4j.driver.v1.authtokens; import org.neo4j.driver.v1.driver; import org.neo4j.driver.v1.graphdatabase; import org.neo4j.driver.v1.session; import org.neo4j.driver.v1.statementresult; import org.neo4j.driver.v1.exceptions.neo4jexception; import org.slf4j.logger; import org.slf4j.loggerfactory;  import java.util.collection; import java.util.map;  public class neo4jsinktask extends sinktask {      private static final logger log = loggerfactory.getlogger(neo4jsinktask.class);      private string uri;     private string user;     private string pass;     private string logfile;      private driver driver;     private session session;      public neo4jsinktask() {     }      @override     public string version() {         return new neo4jsinkconnector().version();     }      @override     public void start(map<string, string> props) {         uri = props.get(neo4jsinkconnector.keys.uri);         user = props.get(neo4jsinkconnector.keys.user);         pass = props.get(neo4jsinkconnector.keys.pass);         logfile = props.get(neo4jsinkconnector.keys.log);          driver = null;         session = null;          try {             driver = graphdatabase.driver(uri, authtokens.basic(user, pass));             session = driver.session();         } catch (neo4jexception ex) {             log.trace(ex.getmessage(), logfilename());         }     }      @override     public void put(collection<sinkrecord> sinkrecords) {         statementresult result;         (sinkrecord record : sinkrecords) {             result = session.run(record.value().tostring());             log.trace(result.tostring(), logfilename());         }     }      @override     public void flush(map<topicpartition, offsetandmetadata> offsets) {     }      @override     public void stop() {         if (session != null)             session.close();         if (driver != null)             driver.close();     }      private string logfilename() {         return logfile == null ? "stdout" : logfile;     } } 

the issue:

after writing that, next built including dependencies had, excluding kafka dependencies, jar (or uber jar? 1 file). edited plugin pathways in connect-standalone.properties include artifact , wrote properties file neo4j sink connector. did in attempt follow these guidelines.

my neo4j sink connector properties file:

name=neo4k-sink  connector.class=neo4k.sink.neo4jsinkconnector  tasks.max=1  uri=bolt://localhost:7687  user=neo4j  pass=hunter2  topics=connect-test 

but upon running standalone, error in output shuts down stream (error on line 5):

[2017-08-14 12:59:00,150] info kafka version : 0.11.0.0 (org.apache.kafka.common.utils.appinfoparser:83) [2017-08-14 12:59:00,150] info kafka commitid : cb8625948210849f (org.apache.kafka.common.utils.appinfoparser:84) [2017-08-14 12:59:00,153] info source task workersourcetask{id=local-file-source-0} finished initialization , start (org.apache.kafka.connect.runtime.workersourcetask:143) [2017-08-14 12:59:00,153] info created connector local-file-source (org.apache.kafka.connect.cli.connectstandalone:91) [2017-08-14 12:59:00,153] error stopping after connector error (org.apache.kafka.connect.cli.connectstandalone:100) java.lang.illegalargumentexception: malformed \uxxxx encoding.     @ java.util.properties.loadconvert(properties.java:574)     @ java.util.properties.load0(properties.java:390)     @ java.util.properties.load(properties.java:341)     @ org.apache.kafka.common.utils.utils.loadprops(utils.java:429)     @ org.apache.kafka.connect.cli.connectstandalone.main(connectstandalone.java:84) [2017-08-14 12:59:00,156] info kafka connect stopping (org.apache.kafka.connect.runtime.connect:65) [2017-08-14 12:59:00,156] info stopping rest server (org.apache.kafka.connect.runtime.rest.restserver:154) [2017-08-14 12:59:00,168] info stopped serverconnector@540accf4{http/1.1}{0.0.0.0:8083} (org.eclipse.jetty.server.serverconnector:306) [2017-08-14 12:59:00,173] info stopped o.e.j.s.servletcontexthandler@6d548d27{/,null,unavailable} (org.eclipse.jetty.server.handler.contexthandler:865) 

edit: should mention during part of connector loading output declaring plugins have been added, not see mention of jar built earlier , created pathway in connect-standalone.properties. here's snippet context:

[2017-08-14 12:58:58,969] info added plugin 'org.apache.kafka.connect.file.filestreamsinkconnector' (org.apache.kafka.connect.runtime.isolation.delegatingclassloader:132) [2017-08-14 12:58:58,969] info added plugin 'org.apache.kafka.connect.tools.mocksourceconnector' (org.apache.kafka.connect.runtime.isolation.delegatingclassloader:132) [2017-08-14 12:58:58,969] info added plugin 'org.apache.kafka.connect.tools.verifiablesourceconnector' (org.apache.kafka.connect.runtime.isolation.delegatingclassloader:132) [2017-08-14 12:58:58,969] info added plugin 'org.apache.kafka.connect.tools.verifiablesinkconnector' (org.apache.kafka.connect.runtime.isolation.delegatingclassloader:132) [2017-08-14 12:58:58,970] info added plugin 'org.apache.kafka.connect.tools.mockconnector' (org.apache.kafka.connect.runtime.isolation.delegatingclassloader:132) 

conclusion:

i @ loss, i've done testing , researching couple hours , don't think i'm sure question ask. i'll thank reading if you've gotten far. if noticed glaring may have done wrong in code or in method (e.g. packaging jar), or think should provide more context or console logs or let me know. thank you, again.

as pointed out @randall hauch, properties file had hidden characters within because rich text document. fixed duplicating connect-file-sink.properties file provided kafka, believe regular text document. renaming , editing duplicate neo4j sink properties.


Comments

Popular posts from this blog

python Tkinter Capturing keyboard events save as one single string -

android - InAppBilling registering BroadcastReceiver in AndroidManifest -

javascript - Z-index in d3.js -