Skip to content

Commit 750f20e

Browse files
Merge pull request #25 from yuriy-glotanov/2020.1beta
fix indexframe rsync bug
2 parents 466f507 + 2ea00b7 commit 750f20e

File tree

7 files changed

+37
-33
lines changed

7 files changed

+37
-33
lines changed

interference-2019.3.jar

-348 KB
Binary file not shown.

interference-2019.3.jar.md5

-1
This file was deleted.

interference-2020.1.jar

1.62 KB
Binary file not shown.

interference-2020.1.jar.md5

+1
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
c790f85b766e557a220bfbc57ba76047 *interference-2020.1.jar

src/main/java/su/interference/persistent/DataFile.java

+4
Original file line numberDiff line numberDiff line change
@@ -178,14 +178,18 @@ public DataFile(int type, int nodeId, int remoteId) {
178178
//normal order of access - lock datafile, then lock table<framedata>
179179
//but, in case of allocate undo space this may looks as:
180180
//lock datafile<undo> - lock table<framedata> - try lock datafile
181+
//todo deprecated started param
181182
public synchronized FrameData createNewFrame(FrameData frame, int frameType, long allocId, boolean started, boolean external, DataObject t, Session s, LLT llt) throws Exception {
182183
//deadlock bug fix
183184
//instead this.allocateFrame we lock Table<FrameData> first
184185
final FrameData bd = t.allocateFrame(this, t, s, llt);
185186
final boolean setcurrenable = external?false:t.getName().equals("su.interference.persistent.UndoChunk")?false:true;
186187
//allocated for rframe
187188
if (allocId>0) { bd.setAllocId(allocId); }
189+
190+
//todo deprecated
188191
if (started) { bd.setStarted(1); }
192+
189193
final Frame db = frameType == 0 ? new DataFrame(bd, t) : new IndexFrame(bd, frameType, t);
190194
db.setObjectId(t.getObjectId());
191195
bd.setFrame(db);

src/main/java/su/interference/persistent/Table.java

+22-8
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ this software and associated documentation files (the "Software"), to deal in
2727
import java.io.IOException;
2828
import java.util.*;
2929
import java.util.concurrent.ConcurrentHashMap;
30+
import java.util.concurrent.ExecutionException;
3031
import java.util.concurrent.LinkedBlockingQueue;
3132
import java.util.concurrent.atomic.AtomicBoolean;
3233
import java.util.concurrent.atomic.AtomicInteger;
@@ -102,6 +103,8 @@ public class Table implements DataObject, ResultSet {
102103
@Column
103104
private AtomicLong incValue;
104105

106+
@Transient
107+
private Map<Integer, Long> ixstartfs = new HashMap<>();
105108
@Transient
106109
private AtomicLong idValue2;
107110
@Transient
@@ -1212,6 +1215,7 @@ protected void delete (final Object o, final Session s, LLT extllt) throws Excep
12121215
if (extllt == null) { llt.commit(); }
12131216
}
12141217

1218+
//todo deprecated started param
12151219
public synchronized FrameData createNewFrame(final FrameData frame, final int fileId, final int frameType, final long allocId, final boolean started, final boolean setlbs, final boolean external, final Session s, final LLT llt) throws Exception {
12161220
final DataFile df = Storage.getStorage().getDataFileById(fileId);
12171221
final FrameData bd = df.createNewFrame(frame, frameType, allocId, started, external, this, s, llt);
@@ -1354,7 +1358,7 @@ public Boolean call() throws Exception {
13541358
List<FrameData> bb = Instance.getInstance().getTableById(getObjectId()).getFrames();
13551359

13561360
for (FrameData b : bb) {
1357-
if (b.getStarted()==1) {
1361+
if (b.getStarted() > 0) {
13581362
startframes.add(b.getFrameId());
13591363
}
13601364
}
@@ -1368,7 +1372,7 @@ public Boolean call() throws Exception {
13681372
throw new InternalException();
13691373
}
13701374
//frame must be local or remote chain started (RCS)
1371-
if (bd.getFrameId() != bd.getAllocId() && bd.getStarted() != 1) {
1375+
if (bd.getFrameId() != bd.getAllocId() && bd.getStarted() == 0) {
13721376
throw new InternalException();
13731377
}
13741378
IndexFrame el = bd.getIndexFrame();
@@ -1705,14 +1709,24 @@ public synchronized void remove (ValueSet key, Object o, Session s, LLT llt) thr
17051709
removeObjects(key, o, s, llt);
17061710
}
17071711

1712+
public synchronized void storeFrames(List<SyncFrame> frames, int sourceNodeId, LLT llt, Session s) throws Exception {
1713+
for (SyncFrame b : frames) {
1714+
if (b.isStarted()) {
1715+
ixstartfs.put(sourceNodeId, b.getBd().getFrameId());
1716+
b.getBd().setStarted(sourceNodeId);
1717+
}
1718+
b.getDf().writeFrame(b.getBd(), b.getBd().getPtr(), b.getBd().getFrame().getFrame(), llt, s);
1719+
}
1720+
}
1721+
17081722
@Deprecated
17091723
public synchronized List<Chunk> getContent(Session s) throws IOException, InternalException, NoSuchMethodException, InvocationTargetException, EmptyFrameHeaderFound, ClassNotFoundException, InstantiationException, IllegalAccessException {
17101724
ArrayList<Chunk> res = new ArrayList<Chunk>();
17111725
res.addAll(getLocalContent(this.fileStart+this.frameStart, s));
17121726
//todo need performance optimizing
17131727
List<FrameData> bb = Instance.getInstance().getTableById(this.getObjectId()).getFrames();
17141728
for (FrameData b : bb) {
1715-
if (b.getStarted()==1) {
1729+
if (b.getStarted() > 0) {
17161730
res.addAll(getLocalContent(b.getFrameId(), s));
17171731
}
17181732
}
@@ -1729,7 +1743,7 @@ private synchronized List<Chunk> getLocalContent(long start, Session s) throws I
17291743
return res;
17301744
}
17311745
//frame must be local or remote chain started (RCS)
1732-
if (bd.getFrameId() != bd.getAllocId() && bd.getStarted() != 1) {
1746+
if (bd.getFrameId() != bd.getAllocId() && bd.getStarted() == 0) {
17331747
return res;
17341748
}
17351749
IndexFrame el = bd.getIndexFrame();
@@ -1770,7 +1784,7 @@ private synchronized ArrayList<FrameData> getLeafFrames (Session s) throws IOExc
17701784
//todo need performance optimizing
17711785
List<FrameData> bb = Instance.getInstance().getTableById(this.getObjectId()).getFrames();
17721786
for (FrameData b : bb) {
1773-
if (b.getStarted()==1) {
1787+
if (b.getStarted() > 0) {
17741788
res.addAll(getLocalLeafFrames(b.getFrameId(), s));
17751789
}
17761790
}
@@ -1783,7 +1797,7 @@ private synchronized ArrayList<FrameData> getLocalLeafFrames (long start, Sessio
17831797
boolean cnue = true;
17841798
FrameData bd = Instance.getInstance().getFrameById(start);
17851799
//frame must be local or remote chain started (RCS)
1786-
if (bd.getFrameId() != bd.getAllocId() && bd.getStarted() != 1) {
1800+
if (bd.getFrameId() != bd.getAllocId() && bd.getStarted() == 0) {
17871801
return res;
17881802
}
17891803
IndexFrame el = bd.getIndexFrame();
@@ -1866,7 +1880,7 @@ public synchronized DataChunk getObjectByKey (ValueSet key) throws IOException,
18661880
//todo need performance optimizing
18671881
final List<FrameData> bb = Instance.getInstance().getTableById(this.getObjectId()).getFrames();
18681882
for (FrameData b : bb) {
1869-
if (b.getStarted() == 1) {
1883+
if (b.getStarted() > 0) {
18701884
final DataChunk dc_ = getLocalObjectByKey(b.getFrameId(), key);
18711885
if (dc_ != null) {
18721886
return dc_;
@@ -1883,7 +1897,7 @@ public synchronized List<DataChunk> getObjectsByKey (ValueSet key) throws IOExce
18831897
//todo need performance optimizing
18841898
final List<FrameData> bb = Instance.getInstance().getTableById(this.getObjectId()).getFrames();
18851899
for (FrameData b : bb) {
1886-
if (b.getStarted() == 1) {
1900+
if (b.getStarted() > 0) {
18871901
r.addAll(getLocalObjectsByKey(b.getFrameId(), key));
18881902
}
18891903
}

src/main/java/su/interference/transport/SyncFrameEvent.java

+10-24
Original file line numberDiff line numberDiff line change
@@ -70,7 +70,6 @@ public synchronized int rframe2(SyncFrame[] sb) throws Exception {
7070
Session s = Session.getDntmSession();
7171
HashMap<Long, Long> hmap = new HashMap<Long, Long>();
7272
HashMap<Long, Long> hmap2 = new HashMap<Long, Long>();
73-
//ArrayList<TransFrame> tframes = new ArrayList<>();
7473
LLT llt = LLT.getLLT();
7574
for (SyncFrame b : sb) {
7675
if (b.isAllowR()) {
@@ -84,19 +83,14 @@ public synchronized int rframe2(SyncFrame[] sb) throws Exception {
8483
for (DataFile f : dfs) {
8584
final int order = (f.getFileId() % Storage.MAX_NODES) % Config.getConfig().FILES_AMOUNT;
8685
if (order == allocOrder) {
87-
//final LLT llt = LLT.getLLT();
88-
bd = t.createNewFrame(null, f.getFileId(), b.getFrameType(), b.getAllocId(), b.isStarted(), false, true, s, llt);
89-
//llt.commit();
90-
//bd.setAllocId(b.getAllocId());
86+
bd = t.createNewFrame(null, f.getFileId(), b.getFrameType(), b.getAllocId(), false, false, true, s, llt);
9187
bd.setFrame(null);
9288
b.setDf(f);
9389
}
9490
}
9591
logger.info("create replicated frame with allocId "+b.getAllocId()+" ptr "+bd.getPtr());
9692
} else {
9793
if (b.getObjectId() == bd.getObjectId()) {
98-
bd.setStarted(b.isStarted()?1:0);
99-
//s.persist(bd, llt);
10094
b.setDf(Instance.getInstance().getDataFileById(bd.getFile()));
10195
logger.info("rframe bd found with allocId=" + b.getAllocId());
10296
} else {
@@ -106,9 +100,7 @@ public synchronized int rframe2(SyncFrame[] sb) throws Exception {
106100
b.setDf(Instance.getInstance().getDataFileById(bd.getFile()));
107101
s.delete(bd);
108102
} else {
109-
bd.setStarted(b.isStarted()?1:0);
110103
bd.setObjectId(b.getObjectId());
111-
//s.persist(bd, llt);
112104
b.setDf(Instance.getInstance().getDataFileById(bd.getFile()));
113105
}
114106
}
@@ -117,7 +109,6 @@ public synchronized int rframe2(SyncFrame[] sb) throws Exception {
117109
b.setBd(bd);
118110
hmap.put(b.getAllocId(), bd.getFrameId());
119111
hmap2.put(b.getFrameId(), bd.getFrameId());
120-
//tframes.addAll(b.getTframes());
121112
}
122113
}
123114
//updateTransFrames(tframes, hmap2, s);
@@ -187,8 +178,8 @@ public synchronized int rframe2(SyncFrame[] sb) throws Exception {
187178
//b.getBd().setFrame(null);
188179
}
189180

190-
List<SyncFrame> nframes = new ArrayList<>();
191-
List<SyncFrame> lframes = new ArrayList<>();
181+
final Map<Integer, List<SyncFrame>> storemap = new HashMap<>();
182+
192183
for (SyncFrame b : sb) {
193184
try {
194185
if (b.isAllowR()) {
@@ -209,29 +200,24 @@ public synchronized int rframe2(SyncFrame[] sb) throws Exception {
209200
frame.setRes05(lcF);
210201
frame.setRes06(parentB);
211202
frame.setRes07(lcB);
212-
//b.getDf().writeFrame(b.getBd(), b.getBd().getPtr(), frame.getFrame(), llt, s);
213203
b.getBd().setFrame(frame);
214-
if (b.getFrameType() == 1) {
215-
lframes.add(b);
216-
}
217-
if (b.getFrameType() == 2) {
218-
nframes.add(b);
219-
}
220204
logger.info("write index frame with allocId "+b.getAllocId()+" ptr "+b.getBd().getPtr());
221205
}
222206
}
207+
if (storemap.get(t.getObjectId()) == null) {
208+
storemap.put(t.getObjectId(), new ArrayList<>());
209+
}
210+
storemap.get(t.getObjectId()).add(b);
223211
}
224212
} catch (Exception e) {
225213
e.printStackTrace();
226214
}
227215
//b.getBd().setFrame(null);
228216
}
229217

230-
for (SyncFrame b : lframes) {
231-
b.getDf().writeFrame(b.getBd(), b.getBd().getPtr(), b.getBd().getFrame().getFrame(), llt, s);
232-
}
233-
for (SyncFrame b : nframes) {
234-
b.getDf().writeFrame(b.getBd(), b.getBd().getPtr(), b.getBd().getFrame().getFrame(), llt, s);
218+
for (Map.Entry<Integer, List<SyncFrame>> entry : storemap.entrySet()) {
219+
final Table t = Instance.getInstance().getTableById(entry.getKey());
220+
t.storeFrames(entry.getValue(), this.getCallbackNodeId(), llt, s);
235221
}
236222

237223
llt.commit();

0 commit comments

Comments
 (0)