apache kafka - JDBC source connect producing duplicate messages -


i using confluent 3.2.1 platform. use-case incrementally read oracle table based on timestamp column , send records kafka topic using jdbc source connect. running connect in distributed mode on 3-node cluster. connector config below --

{    "name" : "voice_source_connector_jdbc",    "config" :       {         "connector.class" : "io.confluent.connect.jdbc.jdbcsourceconnector",         "connection.url" : "jdbc:oracle:thin:crystal/crystal123@x.x.x.x:1521:testdb",         "mode" : "timestamp",         "tasks.max" : 1,         "query" : "select * crystal.voiceedrs",         "timestamp.column.name" : "rec_timestamp",         "topic.prefix" : "voice"       } } 

i tried many times different combination of parameters times getting duplicate rows subsequent inserts oracle table after first insert. if insert 100 rows @ beginning, 100 messages in topic; if insert 100 table, 200 messages produced in topic making total 300.

i set debug on , tested 1 record @ time , found --

1) insert 1 row in blank oracle table -- msisdn,calldatetime,call_direction,called_num,call_duration,rec_timestamp 9831218547,2017-06-03 00:24:06,1,9831220190,1187,2017-08-14 15:11:02.563

i receive 1 message in voice topic {"msisdn":{"string":"9831218547"},"calldatetime":{"long":1496449446000},"call_direction":{"string":"1"},"called_num":{"string":"9831220190"},"call_duration":{"string":"1187"},"rec_timestamp":1502723462563}

i looked topic "stream-connect-offsets" , found below message in partition 7 -- offset: 0 key: ["voice_source_connector_jdbc",{"query":"query"}] checksum/computed: 3729454508/3729454508 compression: none {"timestamp_nanos":563000000,"timestamp":1502723462563}

the log in 1 connect-node says -- [2017-08-14 15:11:10,464] debug timestampincrementingtablequerier{name='null', query='select * crystal.voiceedrs', topicprefix='voice', timestampcolumn='rec_timestamp', incrementingcolumn='null'} prepared sql query: select * crystal.voiceedrs "rec_timestamp" > ? , "rec_timestamp" < ? order "rec_timestamp" asc (io.confluent.connect.jdbc.source.timestampincrementingtablequerier:145) [2017-08-14 15:11:10,464] debug executing query select current_timestamp dual current time database (io.confluent.connect.jdbc.util.jdbcutils:198) [2017-08-14 15:11:10,466] debug executing prepared statement timestamp value = 1970-01-01 00:00:00.000 end time = 2017-08-14 15:11:06.917 (io.confluent.connect.jdbc.source.timestampincrementingtablequerier:172)

[2017-08-14 15:11:10,474] debug send 1 records kafka (org.apache.kafka.connect.runtime.workersourcetask:166)

[2017-08-14 15:11:10,523] debug sending post input {"schema":"{\"type\":\"record\",\"name\":\"connectdefault\",\"namespace\":\"io.confluent.connect.avro .....

2) insert 2nd row in same oracle table , table looks -- msisdn,calldatetime,call_direction,called_num,call_duration,rec_timestamp 9831218547,2017-06-03 00:24:06,1,9831220190,1187,2017-08-14 15:11:02.563 9831212081,2017-08-04 07:43:07,2,7980240223,1013,2017-08-14 15:15:04.034

i receive 2 messages in kafka topic , looks -- {"msisdn":{"string":"9831218547"},"calldatetime":{"long":1496449446000},"call_direction":{"string":"1"},"called_num":{"string":"9831220190"},"call_duration":{"string":"1187"},"rec_timestamp":1502723462563} {"msisdn":{"string":"9831212081"},"calldatetime":{"long":1501832587000},"call_direction":{"string":"2"},"called_num":{"string":"7980240223"},"call_duration":{"string":"1013"},"rec_timestamp":1502723704034} {"msisdn":{"string":"9831212081"},"calldatetime":{"long":1501832587000},"call_direction":{"string":"2"},"called_num":{"string":"7980240223"},"call_duration":{"string":"1013"},"rec_timestamp":1502723704034}

obviously 2nd & 3rd duplicates...

the partition 7 of topic "stream-connect-offsets" has total 3 messages -- offset: 0 key: ["voice_source_connector_jdbc",{"query":"query"}] checksum/computed: 3729454508/3729454508 compression: none {"timestamp_nanos":563000000,"timestamp":1502723462563} offset: 1 key: ["voice_source_connector_jdbc",{"query":"query"}] checksum/computed: 2063511223/2063511223 compression: none {"timestamp_nanos":34000000,"timestamp":1502723704034} offset: 2 key: ["voice_source_connector_jdbc",{"query":"query"}] checksum/computed: 2063511223/2063511223 compression: none {"timestamp_nanos":34000000,"timestamp":1502723704034}

additional log messages 2 connect-nodes --

from node-1 [2017-08-14 15:15:10,758] debug timestampincrementingtablequerier{name='null', query='select * crystal.voiceedrs', topicprefix='voice', timestampcolumn='rec_timestamp', incrementingcolumn='null'} prepared sql query: select * crystal.voiceedrs "rec_timestamp" > ? , "rec_timestamp" < ? order "rec_timestamp" asc (io.confluent.connect.jdbc.source.timestampincrementingtablequerier:145) [2017-08-14 15:15:10,759] debug executing query select current_timestamp dual current time database (io.confluent.connect.jdbc.util.jdbcutils:198) [2017-08-14 15:15:10,760] debug executing prepared statement timestamp value = 2017-08-14 15:11:02.563 end time = 2017-08-14 15:15:07.200 (io.confluent.connect.jdbc.source.timestampincrementingtablequerier:172)

[2017-08-14 15:15:10,762] debug send 1 records kafka (org.apache.kafka.connect.runtime.workersourcetask:166) [2017-08-14 15:15:10,763] debug nothing send kafka. polling source additional records (org.apache.kafka.connect.runtime.workersourcetask:161)

from node-2 [2017-08-14 15:15:11,169] debug timestampincrementingtablequerier{name='null', query='select * crystal.voiceedrs', topicprefix='voice', timestampcolumn='rec_timestamp', incrementingcolumn='null'} prepared sql query: select * crystal.voiceedrs "rec_timestamp" > ? , "rec_timestamp" < ? order "rec_timestamp" asc (io.confluent.connect.jdbc.source.timestampincrementingtablequerier:145) [2017-08-14 15:15:11,169] debug executing query select current_timestamp dual current time database (io.confluent.connect.jdbc.util.jdbcutils:198) [2017-08-14 15:15:11,171] debug executing prepared statement timestamp value = 2017-08-14 15:11:02.563 end time = 2017-08-14 15:15:08.815 (io.confluent.connect.jdbc.source.timestampincrementingtablequerier:172)

[2017-08-14 15:15:11,174] debug send 1 records kafka (org.apache.kafka.connect.runtime.workersourcetask:166)

[2017-08-14 15:15:11,277] debug sending post input {"schema":"{\"type\":\"record\",\"name\":\"connectdefault\",\"namespace\":\"io.confluent.connect.avro .....

i stuck here. can please guide me here , give expert comments may going wrong , should handle this? please let me know in case need provide more information here (configuration etc.) regarding setup

thanks in advance. appreciated.


Comments

Popular posts from this blog

python Tkinter Capturing keyboard events save as one single string -

android - InAppBilling registering BroadcastReceiver in AndroidManifest -

javascript - Z-index in d3.js -