apache kafka - JDBC source connect producing duplicate messages -


i using confluent 3.2.1 platform. use-case incrementally read oracle table based on timestamp column , send records kafka topic using jdbc source connect. running connect in distributed mode on 3-node cluster. connector config below --

{    "name" : "voice_source_connector_jdbc",    "config" :       {         "connector.class" : "io.confluent.connect.jdbc.jdbcsourceconnector",         "connection.url" : "jdbc:oracle:thin:crystal/crystal123@x.x.x.x:1521:testdb",         "mode" : "timestamp",         "tasks.max" : 1,         "query" : "select * crystal.voiceedrs",         "timestamp.column.name" : "rec_timestamp",         "topic.prefix" : "voice"       } } 

i tried many times different combination of parameters times getting duplicate rows subsequent inserts oracle table after first insert. if insert 100 rows @ beginning, 100 messages in topic; if insert 100 table, 200 messages produced in topic making total 300.

i set debug on , tested 1 record @ time , found --

1) insert 1 row in blank oracle table -- msisdn,calldatetime,call_direction,called_num,call_duration,rec_timestamp 9831218547,2017-06-03 00:24:06,1,9831220190,1187,2017-08-14 15:11:02.563

i receive 1 message in voice topic {"msisdn":{"string":"9831218547"},"calldatetime":{"long":1496449446000},"call_direction":{"string":"1"},"called_num":{"string":"9831220190"},"call_duration":{"string":"1187"},"rec_timestamp":1502723462563}

i looked topic "stream-connect-offsets" , found below message in partition 7 -- offset: 0 key: ["voice_source_connector_jdbc",{"query":"query"}] checksum/computed: 3729454508/3729454508 compression: none {"timestamp_nanos":563000000,"timestamp":1502723462563}

the log in 1 connect-node says -- [2017-08-14 15:11:10,464] debug timestampincrementingtablequerier{name='null', query='select * crystal.voiceedrs', topicprefix='voice', timestampcolumn='rec_timestamp', incrementingcolumn='null'} prepared sql query: select * crystal.voiceedrs "rec_timestamp" > ? , "rec_timestamp" < ? order "rec_timestamp" asc (io.confluent.connect.jdbc.source.timestampincrementingtablequerier:145) [2017-08-14 15:11:10,464] debug executing query select current_timestamp dual current time database (io.confluent.connect.jdbc.util.jdbcutils:198) [2017-08-14 15:11:10,466] debug executing prepared statement timestamp value = 1970-01-01 00:00:00.000 end time = 2017-08-14 15:11:06.917 (io.confluent.connect.jdbc.source.timestampincrementingtablequerier:172)

[2017-08-14 15:11:10,474] debug send 1 records kafka (org.apache.kafka.connect.runtime.workersourcetask:166)

[2017-08-14 15:11:10,523] debug sending post input {"schema":"{\"type\":\"record\",\"name\":\"connectdefault\",\"namespace\":\"io.confluent.connect.avro .....

2) insert 2nd row in same oracle table , table looks -- msisdn,calldatetime,call_direction,called_num,call_duration,rec_timestamp 9831218547,2017-06-03 00:24:06,1,9831220190,1187,2017-08-14 15:11:02.563 9831212081,2017-08-04 07:43:07,2,7980240223,1013,2017-08-14 15:15:04.034

i receive 2 messages in kafka topic , looks -- {"msisdn":{"string":"9831218547"},"calldatetime":{"long":1496449446000},"call_direction":{"string":"1"},"called_num":{"string":"9831220190"},"call_duration":{"string":"1187"},"rec_timestamp":1502723462563} {"msisdn":{"string":"9831212081"},"calldatetime":{"long":1501832587000},"call_direction":{"string":"2"},"called_num":{"string":"7980240223"},"call_duration":{"string":"1013"},"rec_timestamp":1502723704034} {"msisdn":{"string":"9831212081"},"calldatetime":{"long":1501832587000},"call_direction":{"string":"2"},"called_num":{"string":"7980240223"},"call_duration":{"string":"1013"},"rec_timestamp":1502723704034}

obviously 2nd & 3rd duplicates...

the partition 7 of topic "stream-connect-offsets" has total 3 messages -- offset: 0 key: ["voice_source_connector_jdbc",{"query":"query"}] checksum/computed: 3729454508/3729454508 compression: none {"timestamp_nanos":563000000,"timestamp":1502723462563} offset: 1 key: ["voice_source_connector_jdbc",{"query":"query"}] checksum/computed: 2063511223/2063511223 compression: none {"timestamp_nanos":34000000,"timestamp":1502723704034} offset: 2 key: ["voice_source_connector_jdbc",{"query":"query"}] checksum/computed: 2063511223/2063511223 compression: none {"timestamp_nanos":34000000,"timestamp":1502723704034}

additional log messages 2 connect-nodes --

from node-1 [2017-08-14 15:15:10,758] debug timestampincrementingtablequerier{name='null', query='select * crystal.voiceedrs', topicprefix='voice', timestampcolumn='rec_timestamp', incrementingcolumn='null'} prepared sql query: select * crystal.voiceedrs "rec_timestamp" > ? , "rec_timestamp" < ? order "rec_timestamp" asc (io.confluent.connect.jdbc.source.timestampincrementingtablequerier:145) [2017-08-14 15:15:10,759] debug executing query select current_timestamp dual current time database (io.confluent.connect.jdbc.util.jdbcutils:198) [2017-08-14 15:15:10,760] debug executing prepared statement timestamp value = 2017-08-14 15:11:02.563 end time = 2017-08-14 15:15:07.200 (io.confluent.connect.jdbc.source.timestampincrementingtablequerier:172)

[2017-08-14 15:15:10,762] debug send 1 records kafka (org.apache.kafka.connect.runtime.workersourcetask:166) [2017-08-14 15:15:10,763] debug nothing send kafka. polling source additional records (org.apache.kafka.connect.runtime.workersourcetask:161)

from node-2 [2017-08-14 15:15:11,169] debug timestampincrementingtablequerier{name='null', query='select * crystal.voiceedrs', topicprefix='voice', timestampcolumn='rec_timestamp', incrementingcolumn='null'} prepared sql query: select * crystal.voiceedrs "rec_timestamp" > ? , "rec_timestamp" < ? order "rec_timestamp" asc (io.confluent.connect.jdbc.source.timestampincrementingtablequerier:145) [2017-08-14 15:15:11,169] debug executing query select current_timestamp dual current time database (io.confluent.connect.jdbc.util.jdbcutils:198) [2017-08-14 15:15:11,171] debug executing prepared statement timestamp value = 2017-08-14 15:11:02.563 end time = 2017-08-14 15:15:08.815 (io.confluent.connect.jdbc.source.timestampincrementingtablequerier:172)

[2017-08-14 15:15:11,174] debug send 1 records kafka (org.apache.kafka.connect.runtime.workersourcetask:166)

[2017-08-14 15:15:11,277] debug sending post input {"schema":"{\"type\":\"record\",\"name\":\"connectdefault\",\"namespace\":\"io.confluent.connect.avro .....

i stuck here. can please guide me here , give expert comments may going wrong , should handle this? please let me know in case need provide more information here (configuration etc.) regarding setup

thanks in advance. appreciated.


Comments

Popular posts from this blog

android - InAppBilling registering BroadcastReceiver in AndroidManifest -

python Tkinter Capturing keyboard events save as one single string -

sql server - Why does Linq-to-SQL add unnecessary COUNT()? -