Skip to content

Commit 843b5a6

Browse files
Merge pull request #23 from yuriy-glotanov/2020.1beta
fix wrong rsync behaviour
2 parents 554c4a1 + 52c192a commit 843b5a6

File tree

8 files changed

+154
-80
lines changed

8 files changed

+154
-80
lines changed

src/main/java/su/interference/core/Frame.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -674,7 +674,7 @@ public int getBytesAmountWoHead() {
674674
return used;
675675
}
676676

677-
private boolean local() {
677+
public boolean isLocal() {
678678
return this.file+this.pointer == this.allocFile+this.allocPointer;
679679
}
680680

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,38 @@
1+
/**
2+
The MIT License (MIT)
3+
4+
Copyright (c) 2010-2019 head systems, ltd
5+
6+
Permission is hereby granted, free of charge, to any person obtaining a copy of
7+
this software and associated documentation files (the "Software"), to deal in
8+
the Software without restriction, including without limitation the rights to
9+
use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of
10+
the Software, and to permit persons to whom the Software is furnished to do so,
11+
subject to the following conditions:
12+
13+
The above copyright notice and this permission notice shall be included in all
14+
copies or substantial portions of the Software.
15+
16+
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
17+
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS
18+
FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR
19+
COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER
20+
IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
21+
CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
22+
23+
*/
24+
25+
package su.interference.core;
26+
27+
import java.lang.annotation.Retention;
28+
import java.lang.annotation.RetentionPolicy;
29+
30+
/**
31+
* @author Yuriy Glotanov
32+
* @since 1.0
33+
*/
34+
35+
@Retention(RetentionPolicy.RUNTIME)
36+
public @interface NoCheck {
37+
38+
}

src/main/java/su/interference/core/SyncFrame.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -71,7 +71,7 @@ public SyncFrame(Frame frame, Session s, FreeFrame fb) throws Exception {
7171
//throw new MissingSyncFrameException();
7272
}
7373

74-
allowR = !t.isNoTran() || t.getName().equals("su.interference.persistent.UndoChunk");
74+
allowR = frame.isLocal() ? !t.isNoTran() || t.getName().equals("su.interference.persistent.UndoChunk") : false;
7575
className = t.getName();
7676
rtran = frame.getLiveTransactions();
7777
tframes = frame.getLiveTransFrames();

src/main/java/su/interference/persistent/DataFile.java

Lines changed: 29 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -232,13 +232,16 @@ public synchronized FrameData createNewFrame(FrameData frame, int frameType, lon
232232
dc.setFrameData(bd);
233233
t.addIndexValue(dc);
234234
} else {
235-
//fix deadlock by reorder access
236-
//if current object is not DATAFILE, then locks DATAFILE first
237-
if (this.getFileId() == bd.getFile()) {
238-
s.persist(bd, llt);
239-
} else {
240-
DataFile df = Instance.getInstance().getDataFileById(bd.getFile());
241-
df.persist(bd, s, llt);
235+
//syncframe event should not persist new frame
236+
if (!external) {
237+
//fix deadlock by reorder access
238+
//if current object is not DATAFILE, then locks DATAFILE first
239+
if (this.getFileId() == bd.getFile()) {
240+
s.persist(bd, llt);
241+
} else {
242+
DataFile df = Instance.getInstance().getDataFileById(bd.getFile());
243+
df.persist(bd, s, llt);
244+
}
242245
}
243246
}
244247

@@ -453,7 +456,26 @@ public synchronized void writeFrame(final long ptr, final byte[] b) throws IOExc
453456

454457
this.file.seek(ptr);
455458
this.file.write(b);
459+
}
460+
461+
public synchronized void writeFrame(FrameData bd, final long ptr, final byte[] b, LLT llt, Session s) throws Exception {
462+
final ByteString bs = new ByteString(b);
463+
final int file_ = bs.getIntFromBytes(0);
464+
final long ptr_ = bs.getLongFromBytes(4);
465+
final int id_ = bs.getIntFromBytes(12);
466+
467+
if (this.fileId != file_) {
468+
logger.error("Wrong write frame operation with file = " + this.file + ", internal file = " + file_ + " ptr = " + ptr_);
469+
}
470+
471+
if (ptr != ptr) {
472+
logger.error("Wrong write frame operation with file = " + this.file + " ptr = " + ptr + ", internal file = " + file_ + " ptr = " + ptr_);
473+
}
474+
475+
this.file.seek(ptr);
476+
this.file.write(b);
456477

478+
s.persist(bd, llt);
457479
}
458480

459481
public int getMaxMemoryCache() {

src/main/java/su/interference/persistent/Session.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -359,7 +359,7 @@ public DataChunk persist (Object o) throws Exception {
359359
return persist(o, null);
360360
}
361361

362-
protected DataChunk persist (Object o, LLT llt) throws Exception {
362+
public DataChunk persist (Object o, LLT llt) throws Exception {
363363
final Table t = Instance.getInstance().getTableByName(o.getClass().getName());
364364
if (t != null) {
365365
return t.persist(o, this, llt);
@@ -378,7 +378,7 @@ public ResultSet execute(String sql) throws Exception {
378378
return ds.getTable();
379379
}
380380

381-
protected void delete (Object o, LLT llt) throws Exception {
381+
public void delete (Object o, LLT llt) throws Exception {
382382
final Table t = Instance.getInstance().getTableByName(o.getClass().getName());
383383
if (t != null) {
384384
t.delete(o, this, llt);

src/main/java/su/interference/persistent/Table.java

Lines changed: 60 additions & 47 deletions
Original file line numberDiff line numberDiff line change
@@ -455,13 +455,19 @@ private java.lang.reflect.Field getTableIdField() throws ClassNotFoundException,
455455
final java.lang.reflect.Field[] f = c.getDeclaredFields();
456456
for (int i=0; i<f.length; i++) {
457457
Id a = f[i].getAnnotation(Id.class);
458-
if (a!=null) {
458+
if (a != null) {
459459
return f[i];
460460
}
461461
}
462462
return null;
463463
}
464464

465+
public boolean isIdFieldNoCheck() throws ClassNotFoundException, MalformedURLException {
466+
java.lang.reflect.Field f = getTableIdField();
467+
NoCheck a = f.getAnnotation(NoCheck.class);
468+
return a == null ? false : true;
469+
}
470+
465471
public java.lang.reflect.Field getIdField() {
466472
return this.idfield;
467473
}
@@ -889,12 +895,15 @@ private void ident(final Object o, final Session s, final LLT llt) throws Except
889895
}
890896
} else {
891897
//for distributed ids, id value don't replace exists > 0
892-
final long exists = (long)f[i].get(o);
893-
if (exists == 0) {
894-
if (f[i].getType().getName().equals("int")) {
898+
if (f[i].getType().getName().equals("int")) {
899+
final int exists = (int)f[i].get(o);
900+
if (exists == 0) {
895901
f[i].setInt(o, (int) (this.getIdValue(s, llt) * Storage.MAX_NODES) + Config.getConfig().LOCAL_NODE_ID);
896902
}
897-
if (f[i].getType().getName().equals("long")) {
903+
}
904+
if (f[i].getType().getName().equals("long")) {
905+
final long exists = (long)f[i].get(o);
906+
if (exists == 0) {
898907
f[i].setLong(o, (this.getIdValue(s, llt) * Storage.MAX_NODES) + Config.getConfig().LOCAL_NODE_ID);
899908
}
900909
}
@@ -1206,33 +1215,35 @@ protected void delete (final Object o, final Session s, LLT extllt) throws Excep
12061215
public synchronized FrameData createNewFrame(final FrameData frame, final int fileId, final int frameType, final long allocId, final boolean started, final boolean setlbs, final boolean external, final Session s, final LLT llt) throws Exception {
12071216
final DataFile df = Storage.getStorage().getDataFileById(fileId);
12081217
final FrameData bd = df.createNewFrame(frame, frameType, allocId, started, external, this, s, llt);
1209-
boolean done = true;
1218+
if (!external) {
1219+
boolean done = true;
12101220
//System.out.println("Table.createNewFrame: old="+(frame==null?"null":frame.getFrameId())+" frame="+(frame.getFrame()==null?"null":(frame.getFrame().getFrameData()==null?":null":frame.getFrame().getFrameData().getFrameId())));
1211-
if (setlbs && !this.getName().equals("su.interference.persistent.UndoChunk")) {
1212-
done = false;
1213-
for (WaitFrame wb : this.lbs) {
1214-
if (wb.trySetBd(frame, bd, frameType)) {
1215-
done = true;
1216-
break;
1221+
if (setlbs && !this.getName().equals("su.interference.persistent.UndoChunk")) {
1222+
done = false;
1223+
for (WaitFrame wb : this.lbs) {
1224+
if (wb.trySetBd(frame, bd, frameType)) {
1225+
done = true;
1226+
break;
1227+
}
12171228
}
12181229
}
1219-
}
1220-
if (!done) {
1221-
// todo evicted frame -> metric
1222-
for (WaitFrame wb : this.lbs) {
1223-
if (wb.getBd().getFile() == bd.getFile()) {
1224-
// remove evicted ptr from prevframe
1225-
frame.setNextFrame(wb.getBd().getPtr());
1226-
s.persist(frame);
1230+
if (!done) {
1231+
// todo evicted frame -> metric
1232+
for (WaitFrame wb : this.lbs) {
1233+
if (wb.getBd().getFile() == bd.getFile()) {
1234+
// remove evicted ptr from prevframe
1235+
frame.setNextFrame(wb.getBd().getPtr());
1236+
s.persist(frame);
1237+
}
12271238
}
1228-
}
1229-
bd.clearCurrent();
1230-
s.persist(bd);
1231-
logger.info("evict frame " + bd.getObjectId() + ":" + bd.getFile() + ":" + bd.getPtr() + " " + Thread.currentThread().getName());
1232-
for (WaitFrame wb : this.lbs) {
1233-
final FrameData bd_ = wb.acquire(fileId);
1234-
if (bd_ != null) {
1235-
return bd_;
1239+
bd.clearCurrent();
1240+
s.persist(bd);
1241+
logger.info("evict frame " + bd.getObjectId() + ":" + bd.getFile() + ":" + bd.getPtr() + " " + Thread.currentThread().getName());
1242+
for (WaitFrame wb : this.lbs) {
1243+
final FrameData bd_ = wb.acquire(fileId);
1244+
if (bd_ != null) {
1245+
return bd_;
1246+
}
12361247
}
12371248
}
12381249
}
@@ -1508,30 +1519,32 @@ public DataChunk getChunkByEntity (Object o, Session s) throws IOException, Invo
15081519
} else {
15091520
final EntityContainer to = (EntityContainer)o;
15101521
final Table idt = getFirstIndexByIdColumn();
1511-
if (to.getDataChunk()==null) {
1512-
final DataChunkId dcid = new DataChunkId(o, s);
1513-
if (idt!=null) {
1514-
final DataChunk idc = idt.getObjectByKey(new ValueSet(dcid.getId()));
1515-
if (idc==null) {
1516-
return null;
1517-
}
1518-
final IndexChunk ibx = (IndexChunk)idc.getEntity();
1519-
return ibx.getDataChunk();
1520-
} else {
1521-
final byte[] id = dcid.getIdBytes();
1522-
if (id != null) {
1523-
final List<FrameData> bds = Instance.getInstance().getTableById(this.getObjectId()).getFrames();
1524-
for (FrameData b : bds) {
1525-
for (Chunk dc : b.getDataFrame().getFrameChunks(s)) {
1526-
if (Arrays.equals(id, ((DataChunk) dc).getSerializedId(s))) {
1527-
return (DataChunk) dc;
1522+
if (!isIdFieldNoCheck()) {
1523+
if (to.getDataChunk() == null) {
1524+
final DataChunkId dcid = new DataChunkId(o, s);
1525+
if (idt != null) {
1526+
final DataChunk idc = idt.getObjectByKey(new ValueSet(dcid.getId()));
1527+
if (idc == null) {
1528+
return null;
1529+
}
1530+
final IndexChunk ibx = (IndexChunk) idc.getEntity();
1531+
return ibx.getDataChunk();
1532+
} else {
1533+
final byte[] id = dcid.getIdBytes();
1534+
if (id != null) {
1535+
final List<FrameData> bds = Instance.getInstance().getTableById(this.getObjectId()).getFrames();
1536+
for (FrameData b : bds) {
1537+
for (Chunk dc : b.getDataFrame().getFrameChunks(s)) {
1538+
if (Arrays.equals(id, ((DataChunk) dc).getSerializedId(s))) {
1539+
return (DataChunk) dc;
1540+
}
15281541
}
15291542
}
15301543
}
15311544
}
1545+
} else {
1546+
return to.getDataChunk();
15321547
}
1533-
} else {
1534-
return to.getDataChunk();
15351548
}
15361549
}
15371550

src/main/java/su/interference/transport/SyncFrameEvent.java

Lines changed: 15 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -71,6 +71,7 @@ public synchronized int rframe2(SyncFrame[] sb) throws Exception {
7171
HashMap<Long, Long> hmap = new HashMap<Long, Long>();
7272
HashMap<Long, Long> hmap2 = new HashMap<Long, Long>();
7373
//ArrayList<TransFrame> tframes = new ArrayList<>();
74+
LLT llt = LLT.getLLT();
7475
for (SyncFrame b : sb) {
7576
if (b.isAllowR()) {
7677
updateTransactions(b.getRtran(), s);
@@ -84,30 +85,30 @@ public synchronized int rframe2(SyncFrame[] sb) throws Exception {
8485
final int order = (f.getFileId() % Storage.MAX_NODES) % Config.getConfig().FILES_AMOUNT;
8586
if (order == allocOrder) {
8687
//final LLT llt = LLT.getLLT();
87-
bd = t.createNewFrame(null, f.getFileId(), b.getFrameType(), b.getAllocId(), b.isStarted(), false, true, s, null);
88+
bd = t.createNewFrame(null, f.getFileId(), b.getFrameType(), b.getAllocId(), b.isStarted(), false, true, s, llt);
8889
//llt.commit();
8990
//bd.setAllocId(b.getAllocId());
9091
bd.setFrame(null);
9192
b.setDf(f);
9293
}
9394
}
94-
logger.debug("create replicated frame with allocId "+b.getAllocId()+" ptr "+bd.getPtr());
95+
logger.info("create replicated frame with allocId "+b.getAllocId()+" ptr "+bd.getPtr());
9596
} else {
9697
if (b.getObjectId() == bd.getObjectId()) {
9798
bd.setStarted(b.isStarted()?1:0);
98-
s.persist(bd);
99+
//s.persist(bd, llt);
99100
b.setDf(Instance.getInstance().getDataFileById(bd.getFile()));
100-
logger.debug("rframe bd found with allocId=" + b.getAllocId());
101+
logger.info("rframe bd found with allocId=" + b.getAllocId());
101102
} else {
102103
if (b.getObjectId() == 0) {
103104
final FreeFrame fb = new FreeFrame(0, bd.getFrameId(), bd.getSize());
104-
s.persist(fb);
105+
s.persist(fb, llt);
105106
b.setDf(Instance.getInstance().getDataFileById(bd.getFile()));
106107
s.delete(bd);
107108
} else {
108109
bd.setStarted(b.isStarted()?1:0);
109110
bd.setObjectId(b.getObjectId());
110-
s.persist(bd);
111+
//s.persist(bd, llt);
111112
b.setDf(Instance.getInstance().getDataFileById(bd.getFile()));
112113
}
113114
}
@@ -119,7 +120,7 @@ public synchronized int rframe2(SyncFrame[] sb) throws Exception {
119120
//tframes.addAll(b.getTframes());
120121
}
121122
}
122-
123+
llt.commit();
123124
//updateTransFrames(tframes, hmap2, s);
124125

125126
final Map<Long, List<Chunk>> umap = new ConcurrentHashMap<>();
@@ -144,9 +145,9 @@ public synchronized int rframe2(SyncFrame[] sb) throws Exception {
144145
frame.setRes02(nextF);
145146
frame.setRes06(prevB);
146147
frame.setRes07(nextB);
147-
b.getDf().writeFrame(b.getBd().getPtr(), frame.getFrame());
148+
b.getDf().writeFrame(b.getBd(), b.getBd().getPtr(), frame.getFrame(), llt, s);
148149
b.getBd().setFrame(null);
149-
logger.debug("write undo frame with allocId "+b.getAllocId()+" ptr "+b.getBd().getPtr());
150+
logger.info("write undo frame with allocId "+b.getAllocId()+" ptr "+b.getBd().getPtr());
150151
}
151152
}
152153
}
@@ -175,9 +176,9 @@ public synchronized int rframe2(SyncFrame[] sb) throws Exception {
175176
frame.setRes02(nextF);
176177
frame.setRes06(prevB);
177178
frame.setRes07(nextB);
178-
b.getDf().writeFrame(b.getBd().getPtr(), frame.getFrame());
179+
b.getDf().writeFrame(b.getBd(), b.getBd().getPtr(), frame.getFrame(), llt, s);
179180
b.getBd().setFrame(frame);
180-
logger.debug("write data frame with allocId "+b.getAllocId()+" ptr "+b.getBd().getPtr());
181+
logger.info("write data frame with allocId "+b.getAllocId()+" ptr "+b.getBd().getPtr());
181182
}
182183
}
183184
}
@@ -207,9 +208,9 @@ public synchronized int rframe2(SyncFrame[] sb) throws Exception {
207208
frame.setRes05(lcF);
208209
frame.setRes06(parentB);
209210
frame.setRes07(lcB);
210-
b.getDf().writeFrame(b.getBd().getPtr(), frame.getFrame());
211+
b.getDf().writeFrame(b.getBd(), b.getBd().getPtr(), frame.getFrame(), llt, s);
211212
b.getBd().setFrame(frame);
212-
logger.debug("write index frame with allocId "+b.getAllocId()+" ptr "+b.getBd().getPtr());
213+
logger.info("write index frame with allocId "+b.getAllocId()+" ptr "+b.getBd().getPtr());
213214
}
214215
}
215216
}
@@ -230,7 +231,7 @@ public synchronized int rframe2(SyncFrame[] sb) throws Exception {
230231
SQLCursor.addStreamFrame(new ContainerFrame(entry.getKey(), entry.getValue()));
231232
}
232233

233-
logger.info(sb.length + " frame(s) were received and synced");
234+
logger.debug(sb.length + " frame(s) were received and synced");
234235

235236
return 0;
236237

0 commit comments

Comments
 (0)