apache kafka - JDBC source connect producing duplicate messages -
i using confluent 3.2.1 platform. use-case incrementally read oracle table based on timestamp column , send records kafka topic using jdbc source connect. running connect in distributed mode on 3-node cluster. connector config below --
{ "name" : "voice_source_connector_jdbc", "config" : { "connector.class" : "io.confluent.connect.jdbc.jdbcsourceconnector", "connection.url" : "jdbc:oracle:thin:crystal/crystal123@x.x.x.x:1521:testdb", "mode" : "timestamp", "tasks.max" : 1, "query" : "select * crystal.voiceedrs", "timestamp.column.name" : "rec_timestamp", "topic.prefix" : "voice" } }
i tried many times different combination of parameters times getting duplicate rows subsequent inserts oracle table after first insert. if insert 100 rows @ beginning, 100 messages in topic; if insert 100 table, 200 messages produced in topic making total 300.
i set debug on , tested 1 record @ time , found --
1) insert 1 row in blank oracle table -- msisdn,calldatetime,call_direction,called_num,call_duration,rec_timestamp 9831218547,2017-06-03 00:24:06,1,9831220190,1187,2017-08-14 15:11:02.563
i receive 1 message in voice topic {"msisdn":{"string":"9831218547"},"calldatetime":{"long":1496449446000},"call_direction":{"string":"1"},"called_num":{"string":"9831220190"},"call_duration":{"string":"1187"},"rec_timestamp":1502723462563}
i looked topic "stream-connect-offsets" , found below message in partition 7 -- offset: 0 key: ["voice_source_connector_jdbc",{"query":"query"}] checksum/computed: 3729454508/3729454508 compression: none {"timestamp_nanos":563000000,"timestamp":1502723462563}
the log in 1 connect-node says -- [2017-08-14 15:11:10,464] debug timestampincrementingtablequerier{name='null', query='select * crystal.voiceedrs', topicprefix='voice', timestampcolumn='rec_timestamp', incrementingcolumn='null'} prepared sql query: select * crystal.voiceedrs "rec_timestamp" > ? , "rec_timestamp" < ? order "rec_timestamp" asc (io.confluent.connect.jdbc.source.timestampincrementingtablequerier:145) [2017-08-14 15:11:10,464] debug executing query select current_timestamp dual current time database (io.confluent.connect.jdbc.util.jdbcutils:198) [2017-08-14 15:11:10,466] debug executing prepared statement timestamp value = 1970-01-01 00:00:00.000 end time = 2017-08-14 15:11:06.917 (io.confluent.connect.jdbc.source.timestampincrementingtablequerier:172)
[2017-08-14 15:11:10,474] debug send 1 records kafka (org.apache.kafka.connect.runtime.workersourcetask:166)
[2017-08-14 15:11:10,523] debug sending post input {"schema":"{\"type\":\"record\",\"name\":\"connectdefault\",\"namespace\":\"io.confluent.connect.avro .....
2) insert 2nd row in same oracle table , table looks -- msisdn,calldatetime,call_direction,called_num,call_duration,rec_timestamp 9831218547,2017-06-03 00:24:06,1,9831220190,1187,2017-08-14 15:11:02.563 9831212081,2017-08-04 07:43:07,2,7980240223,1013,2017-08-14 15:15:04.034
i receive 2 messages in kafka topic , looks -- {"msisdn":{"string":"9831218547"},"calldatetime":{"long":1496449446000},"call_direction":{"string":"1"},"called_num":{"string":"9831220190"},"call_duration":{"string":"1187"},"rec_timestamp":1502723462563} {"msisdn":{"string":"9831212081"},"calldatetime":{"long":1501832587000},"call_direction":{"string":"2"},"called_num":{"string":"7980240223"},"call_duration":{"string":"1013"},"rec_timestamp":1502723704034} {"msisdn":{"string":"9831212081"},"calldatetime":{"long":1501832587000},"call_direction":{"string":"2"},"called_num":{"string":"7980240223"},"call_duration":{"string":"1013"},"rec_timestamp":1502723704034}
obviously 2nd & 3rd duplicates...
the partition 7 of topic "stream-connect-offsets" has total 3 messages -- offset: 0 key: ["voice_source_connector_jdbc",{"query":"query"}] checksum/computed: 3729454508/3729454508 compression: none {"timestamp_nanos":563000000,"timestamp":1502723462563} offset: 1 key: ["voice_source_connector_jdbc",{"query":"query"}] checksum/computed: 2063511223/2063511223 compression: none {"timestamp_nanos":34000000,"timestamp":1502723704034} offset: 2 key: ["voice_source_connector_jdbc",{"query":"query"}] checksum/computed: 2063511223/2063511223 compression: none {"timestamp_nanos":34000000,"timestamp":1502723704034}
additional log messages 2 connect-nodes --
from node-1 [2017-08-14 15:15:10,758] debug timestampincrementingtablequerier{name='null', query='select * crystal.voiceedrs', topicprefix='voice', timestampcolumn='rec_timestamp', incrementingcolumn='null'} prepared sql query: select * crystal.voiceedrs "rec_timestamp" > ? , "rec_timestamp" < ? order "rec_timestamp" asc (io.confluent.connect.jdbc.source.timestampincrementingtablequerier:145) [2017-08-14 15:15:10,759] debug executing query select current_timestamp dual current time database (io.confluent.connect.jdbc.util.jdbcutils:198) [2017-08-14 15:15:10,760] debug executing prepared statement timestamp value = 2017-08-14 15:11:02.563 end time = 2017-08-14 15:15:07.200 (io.confluent.connect.jdbc.source.timestampincrementingtablequerier:172)
[2017-08-14 15:15:10,762] debug send 1 records kafka (org.apache.kafka.connect.runtime.workersourcetask:166) [2017-08-14 15:15:10,763] debug nothing send kafka. polling source additional records (org.apache.kafka.connect.runtime.workersourcetask:161)
from node-2 [2017-08-14 15:15:11,169] debug timestampincrementingtablequerier{name='null', query='select * crystal.voiceedrs', topicprefix='voice', timestampcolumn='rec_timestamp', incrementingcolumn='null'} prepared sql query: select * crystal.voiceedrs "rec_timestamp" > ? , "rec_timestamp" < ? order "rec_timestamp" asc (io.confluent.connect.jdbc.source.timestampincrementingtablequerier:145) [2017-08-14 15:15:11,169] debug executing query select current_timestamp dual current time database (io.confluent.connect.jdbc.util.jdbcutils:198) [2017-08-14 15:15:11,171] debug executing prepared statement timestamp value = 2017-08-14 15:11:02.563 end time = 2017-08-14 15:15:08.815 (io.confluent.connect.jdbc.source.timestampincrementingtablequerier:172)
[2017-08-14 15:15:11,174] debug send 1 records kafka (org.apache.kafka.connect.runtime.workersourcetask:166)
[2017-08-14 15:15:11,277] debug sending post input {"schema":"{\"type\":\"record\",\"name\":\"connectdefault\",\"namespace\":\"io.confluent.connect.avro .....
i stuck here. can please guide me here , give expert comments may going wrong , should handle this? please let me know in case need provide more information here (configuration etc.) regarding setup
thanks in advance. appreciated.
Comments
Post a Comment