-
Notifications
You must be signed in to change notification settings - Fork 1.7k
Description
Search before asking
- I had searched in the issues and found no similar issues.
What happened
#1 (04adf9b58997215c316ac4ca79d96627) switched from RUNNING to FAILED.
java.util.NoSuchElementException: No value present
at java.util.Optional.get(Optional.java:135)
at com.dtstack.chunjun.connector.jdbc.sink.JdbcOutputFormat.prepareTemplates(JdbcOutputFormat.java:308)
at com.dtstack.chunjun.connector.jdbc.sink.JdbcOutputFormat.buildStmtProxy(JdbcOutputFormat.java:124)
at com.dtstack.chunjun.connector.jdbc.sink.JdbcOutputFormat.openInternal(JdbcOutputFormat.java:107)
at com.dtstack.chunjun.sink.format.BaseRichOutputFormat.open(BaseRichOutputFormat.java:262)
at com.dtstack.chunjun.sink.DtOutputFormatSinkFunction.open(DtOutputFormatSinkFunction.java:95)
at org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:34)
at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102)
at org.apache.flink.table.runtime.operators.sink.SinkOperator.open(SinkOperator.java:63)
at org.apache.flink.streaming.runtime.tasks.OperatorChain.initializeStateAndOpenOperators(OperatorChain.java:433)
at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$2(StreamTask.java:545)
at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:50)
at org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:535)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:575)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:758)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:573)
at java.lang.Thread.run(Thread.java:748)
What you expected to happen
正常工作
How to reproduce
CREATE TABLE source
(
id int,
cname varchar,
sale_amt decimal(18,2),
ttime timestamp
) WITH (
'connector' = 'binlog-x'
,'username' = 'root'
,'password' = 'xxxx'
,'cat' = 'insert,delete,update'
,'url' = 'jdbc:mysql://172.16.44.83:3306/ambari?useSSL=false'
,'host' = '172.16.44.83'
,'port' = '3306'
,'table' = 'ambari.test_binlog'
,'timestamp-format.standard' = 'SQL'
);
CREATE TABLE sink
(
id int,
cname varchar,
sale_amt decimal(18,2),
ttime timestamp,
primary key (id) not enforced
) WITH (
'connector' = 'clickhouse-x',
'url' = 'jdbc:clickhouse://172.16.44.85:8123/ck_db1',
'table-name' = 'test_binlog',
'username' = 'bigdata',
'password' = 'bigdata',
'sink.buffer-flush.max-rows' = '1',
'sink.all-replace' = 'true'
);
insert into sink
select *
from source u;
Anything else
No response
Version
master
Are you willing to submit PR?
- Yes I am willing to submit a PR!
Code of Conduct
- I agree to follow this project's Code of Conduct