Description
Please answer these questions before submitting your issue.
- Why do you submit this issue?
- [√] Question or discussion
- Bug
- Requirement
- Feature or performance improvement
Question
- What do you want to know?
我在测试回滚,回滚后再获取数据时报错“Received an error returned by the server: something goes wrong with channel:[id: 0x146b0a4c, /127.0.0.1:64186 => /127.0.0.1:11111], exception=com.alibaba.otter.canal.server.exception.CanalServerException: rollback error, clientId:1 batchId:-1 is not exist , please check
”
Bug
-
Which version of Canal-Server, OS ,Mysql?
MySQL版本:8.0.35
CanalSharp:1.2.1 -
What happen?
If possible, provide a way for reproducing the error. e.g. demo application.
var loggerFactory = LoggerFactory.Create(builder =>
{
builder
.AddFilter("Microsoft", LogLevel.Debug)
.AddFilter("System", LogLevel.Information)
.AddConsole();
});
var _logger = loggerFactory.CreateLogger();
string id = Console.ReadLine();
var conn = new SimpleCanalConnection(new SimpleCanalOptions("127.0.0.1", 11111, id), _logger);
//连接到 Canal Server
await conn.ConnectAsync();
//订阅
await conn.SubscribeAsync("canal_test.test_user");
while (true)
{
//var entries = await conn.GetAsync(1024);
var entries = await conn.GetWithoutAckAsync(1024);
foreach (var entry in entries.Entries)
{
//不处理事务标记
if (entry.EntryType == EntryType.Transactionbegin || entry.EntryType == EntryType.Transactionend)
{
continue;
}
RowChange rowChange = null;
try
{
rowChange = RowChange.Parser.ParseFrom(entry.StoreValue);
var eventType = rowChange.EventType;
foreach (var rowData in rowChange.RowDatas)
{
//删除的数据
if (eventType == CanalSharp.Protocol.EventType.Delete)
{
PrintColumn(rowData.BeforeColumns.ToList());
}
//插入的数据
else if (eventType == CanalSharp.Protocol.EventType.Insert)
{
PrintColumn(rowData.AfterColumns.ToList());
}
//更新的数据
else
{
_logger.LogInformation("-------> before");
PrintColumn(rowData.BeforeColumns.ToList());
_logger.LogInformation("-------> after");
PrintColumn(rowData.AfterColumns.ToList());
}
}
}
catch (Exception e)
{
Console.WriteLine(e);
}
}
await conn.RollbackAsync(entries.Id);
}
Console.ReadLine();
Requirement or improvement
- Please describe about your requirements or improvement suggestions.