Skip to content

Commit 265f522

Browse files
authored
Refine handling of moving between replay and continue states (#3337)
1 parent 1b07f8b commit 265f522

File tree

5 files changed

+111
-47
lines changed

5 files changed

+111
-47
lines changed

src/Microsoft.Data.SqlClient/netcore/src/Microsoft/Data/SqlClient/TdsParser.cs

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -6500,8 +6500,8 @@ private TdsOperationStatus TryReadByteArrayWithContinue(TdsParserStateObject sta
65006500
bytes = null;
65016501
int offset = 0;
65026502
byte[] temp = null;
6503-
(bool isAvailable, bool isStarting, bool isContinuing) = stateObj.GetSnapshotStatuses();
6504-
if (isAvailable)
6503+
(bool canContinue, bool isStarting, bool isContinuing) = stateObj.GetSnapshotStatuses();
6504+
if (canContinue)
65056505
{
65066506
if (isContinuing || isStarting)
65076507
{
@@ -12983,9 +12983,9 @@ internal TdsOperationStatus TryReadPlpUnicodeCharsWithContinue(TdsParserStateObj
1298312983
char[] temp = null;
1298412984
bool buffIsRented = false;
1298512985
int startOffset = 0;
12986-
(bool isAvailable, bool isStarting, bool isContinuing) = stateObj.GetSnapshotStatuses();
12986+
(bool canContinue, bool isStarting, bool isContinuing) = stateObj.GetSnapshotStatuses();
1298712987

12988-
if (isAvailable)
12988+
if (canContinue)
1298912989
{
1299012990
if (isContinuing || isStarting)
1299112991
{
@@ -13004,7 +13004,7 @@ internal TdsOperationStatus TryReadPlpUnicodeCharsWithContinue(TdsParserStateObj
1300413004
length >> 1,
1300513005
stateObj,
1300613006
out length,
13007-
supportRentedBuff: !isAvailable, // do not use the arraypool if we are going to keep the buffer in the snapshot
13007+
supportRentedBuff: !canContinue, // do not use the arraypool if we are going to keep the buffer in the snapshot
1300813008
rentedBuff: ref buffIsRented,
1300913009
startOffset,
1301013010
isStarting || isContinuing

src/Microsoft.Data.SqlClient/netfx/src/Microsoft/Data/SqlClient/TdsParser.cs

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -6696,8 +6696,8 @@ private TdsOperationStatus TryReadByteArrayWithContinue(TdsParserStateObject sta
66966696
bytes = null;
66976697
int offset = 0;
66986698
byte[] temp = null;
6699-
(bool isAvailable, bool isStarting, bool isContinuing) = stateObj.GetSnapshotStatuses();
6700-
if (isAvailable)
6699+
(bool canContinue, bool isStarting, bool isContinuing) = stateObj.GetSnapshotStatuses();
6700+
if (canContinue)
67016701
{
67026702
if (isContinuing || isStarting)
67036703
{
@@ -13176,9 +13176,9 @@ internal TdsOperationStatus TryReadPlpUnicodeCharsWithContinue(TdsParserStateObj
1317613176
char[] temp = null;
1317713177
bool buffIsRented = false;
1317813178
int startOffset = 0;
13179-
(bool isAvailable, bool isStarting, bool isContinuing) = stateObj.GetSnapshotStatuses();
13179+
(bool canContinue, bool isStarting, bool isContinuing) = stateObj.GetSnapshotStatuses();
1318013180

13181-
if (isAvailable)
13181+
if (canContinue)
1318213182
{
1318313183
if (isContinuing || isStarting)
1318413184
{
@@ -13197,7 +13197,7 @@ internal TdsOperationStatus TryReadPlpUnicodeCharsWithContinue(TdsParserStateObj
1319713197
length >> 1,
1319813198
stateObj,
1319913199
out length,
13200-
supportRentedBuff: !isAvailable, // do not use the arraypool if we are going to keep the buffer in the snapshot
13200+
supportRentedBuff: !canContinue, // do not use the arraypool if we are going to keep the buffer in the snapshot
1320113201
rentedBuff: ref buffIsRented,
1320213202
startOffset,
1320313203
isStarting || isContinuing

src/Microsoft.Data.SqlClient/src/Microsoft/Data/SqlClient/SqlCachedBuffer.cs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -39,10 +39,10 @@ internal static TdsOperationStatus TryCreate(SqlMetaDataPriv metadata, TdsParser
3939
{
4040
buffer = null;
4141

42-
(bool isAvailable, bool isStarting, _) = stateObj.GetSnapshotStatuses();
42+
(bool canContinue, bool isStarting, _) = stateObj.GetSnapshotStatuses();
4343

4444
List<byte[]> cachedBytes = null;
45-
if (isAvailable)
45+
if (canContinue)
4646
{
4747
cachedBytes = stateObj.TryTakeSnapshotStorage() as List<byte[]>;
4848
if (isStarting)
@@ -78,10 +78,10 @@ internal static TdsOperationStatus TryCreate(SqlMetaDataPriv metadata, TdsParser
7878
byte[] byteArr = new byte[cb];
7979
// pass false for the writeDataSizeToSnapshot parameter because we want to only take data
8080
// from the current packet and not try to do a continue-capable multi packet read
81-
result = stateObj.TryReadPlpBytes(ref byteArr, 0, cb, out cb, writeDataSizeToSnapshot: false, compatibilityMode: false);
81+
result = stateObj.TryReadPlpBytes(ref byteArr, 0, cb, out cb, canContinue, writeDataSizeToSnapshot: false, compatibilityMode: false);
8282
if (result != TdsOperationStatus.Done)
8383
{
84-
if (result == TdsOperationStatus.NeedMoreData && isAvailable && cb == byteArr.Length)
84+
if (result == TdsOperationStatus.NeedMoreData && canContinue && cb == byteArr.Length)
8585
{
8686
// succeeded in getting the data but failed to find the next plp length
8787
returnAfterAdd = true;

src/Microsoft.Data.SqlClient/src/Microsoft/Data/SqlClient/TdsParserStateObject.cs

Lines changed: 41 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -1870,7 +1870,7 @@ internal TdsOperationStatus TryReadStringWithEncoding(int length, System.Text.En
18701870
}
18711871
byte[] buf = null;
18721872
int offset = 0;
1873-
(bool isAvailable, bool isStarting, bool isContinuing) = GetSnapshotStatuses();
1873+
(bool canContinue, bool isStarting, bool isContinuing) = GetSnapshotStatuses();
18741874

18751875
if (isPlp)
18761876
{
@@ -1888,7 +1888,7 @@ internal TdsOperationStatus TryReadStringWithEncoding(int length, System.Text.En
18881888
if (((_inBytesUsed + length) > _inBytesRead) || (_inBytesPacket < length))
18891889
{
18901890
int startOffset = 0;
1891-
if (isAvailable)
1891+
if (canContinue)
18921892
{
18931893
if (isContinuing || isStarting)
18941894
{
@@ -1906,7 +1906,7 @@ internal TdsOperationStatus TryReadStringWithEncoding(int length, System.Text.En
19061906
buf = new byte[length];
19071907
}
19081908

1909-
TdsOperationStatus result = TryReadByteArray(buf, length, out _, startOffset, isAvailable);
1909+
TdsOperationStatus result = TryReadByteArray(buf, length, out _, startOffset, canContinue);
19101910

19111911
if (result != TdsOperationStatus.Done)
19121912
{
@@ -2036,22 +2036,23 @@ internal int ReadPlpBytesChunk(byte[] buff, int offset, int len)
20362036

20372037
internal TdsOperationStatus TryReadPlpBytes(ref byte[] buff, int offset, int len, out int totalBytesRead)
20382038
{
2039+
bool canContinue = false;
20392040
bool isStarting = false;
20402041
bool isContinuing = false;
20412042
bool compatibilityMode = LocalAppContextSwitches.UseCompatibilityAsyncBehaviour;
20422043
if (!compatibilityMode)
20432044
{
2044-
(_, isStarting, isContinuing) = GetSnapshotStatuses();
2045+
(canContinue, isStarting, isContinuing) = GetSnapshotStatuses();
20452046
}
2046-
return TryReadPlpBytes(ref buff, offset, len, out totalBytesRead, isStarting || isContinuing, compatibilityMode);
2047+
return TryReadPlpBytes(ref buff, offset, len, out totalBytesRead, canContinue, canContinue, compatibilityMode);
20472048
}
20482049
// Reads the requested number of bytes from a plp data stream, or the entire data if
20492050
// requested length is -1 or larger than the actual length of data. First call to this method
20502051
// should be preceeded by a call to ReadPlpLength or ReadDataLength.
20512052
// Returns the actual bytes read.
20522053
// NOTE: This method must be retriable WITHOUT replaying a snapshot
20532054
// Every time you call this method increment the offset and decrease len by the value of totalBytesRead
2054-
internal TdsOperationStatus TryReadPlpBytes(ref byte[] buff, int offset, int len, out int totalBytesRead, bool writeDataSizeToSnapshot, bool compatibilityMode)
2055+
internal TdsOperationStatus TryReadPlpBytes(ref byte[] buff, int offset, int len, out int totalBytesRead, bool canContinue, bool writeDataSizeToSnapshot, bool compatibilityMode)
20552056
{
20562057
totalBytesRead = 0;
20572058

@@ -2076,9 +2077,16 @@ internal TdsOperationStatus TryReadPlpBytes(ref byte[] buff, int offset, int len
20762077
// If total length is known up front, allocate the whole buffer in one shot instead of realloc'ing and copying over each time
20772078
if (buff == null && _longlen != TdsEnums.SQL_PLP_UNKNOWNLEN)
20782079
{
2079-
if (writeDataSizeToSnapshot)
2080+
if (compatibilityMode && _snapshot != null && _snapshotStatus != SnapshotStatus.NotActive)
2081+
{
2082+
// legacy replay path perf optimization
2083+
// if there is a snapshot which contains a stored plp buffer take it
2084+
// and try to use it if it is the right length
2085+
buff = TryTakeSnapshotStorage() as byte[];
2086+
}
2087+
else if (writeDataSizeToSnapshot && canContinue && _snapshot != null)
20802088
{
2081-
// if there is a snapshot and it contains a stored plp buffer take it
2089+
// if there is a snapshot which it contains a stored plp buffer take it
20822090
// and try to use it if it is the right length
20832091
buff = TryTakeSnapshotStorage() as byte[];
20842092
if (buff != null)
@@ -2087,13 +2095,7 @@ internal TdsOperationStatus TryReadPlpBytes(ref byte[] buff, int offset, int len
20872095
totalBytesRead = offset;
20882096
}
20892097
}
2090-
else if (compatibilityMode && _snapshot != null && _snapshotStatus != SnapshotStatus.NotActive)
2091-
{
2092-
// legacy replay path perf optimization
2093-
// if there is a snapshot and it contains a stored plp buffer take it
2094-
// and try to use it if it is the right length
2095-
buff = TryTakeSnapshotStorage() as byte[];
2096-
}
2098+
20972099

20982100
if ((ulong)(buff?.Length ?? 0) != _longlen)
20992101
{
@@ -2145,44 +2147,50 @@ internal TdsOperationStatus TryReadPlpBytes(ref byte[] buff, int offset, int len
21452147
_longlenleft -= (ulong)bytesRead;
21462148
if (result != TdsOperationStatus.Done)
21472149
{
2148-
if (writeDataSizeToSnapshot)
2150+
if (compatibilityMode && _snapshot != null)
21492151
{
2152+
// legacy replay path perf optimization
21502153
// a partial read has happened so store the target buffer in the snapshot
21512154
// so it can be re-used when another packet arrives and we read again
21522155
SetSnapshotStorage(buff);
2153-
SetSnapshotDataSize(bytesRead);
2154-
21552156
}
2156-
else if (compatibilityMode && _snapshot != null)
2157+
else if (canContinue)
21572158
{
2158-
// legacy replay path perf optimization
21592159
// a partial read has happened so store the target buffer in the snapshot
21602160
// so it can be re-used when another packet arrives and we read again
21612161
SetSnapshotStorage(buff);
2162+
if (writeDataSizeToSnapshot)
2163+
{
2164+
SetSnapshotDataSize(bytesRead);
2165+
}
21622166
}
21632167
return result;
21642168
}
2169+
if (writeDataSizeToSnapshot && canContinue)
2170+
{
2171+
SetSnapshotDataSize(bytesRead);
2172+
}
21652173

21662174
if (_longlenleft == 0)
21672175
{
21682176
// Read the next chunk or cleanup state if hit the end
21692177
result = TryReadPlpLength(false, out _);
21702178
if (result != TdsOperationStatus.Done)
21712179
{
2172-
if (writeDataSizeToSnapshot)
2173-
{
2174-
if (result == TdsOperationStatus.NeedMoreData)
2175-
{
2176-
SetSnapshotStorage(buff);
2177-
SetSnapshotDataSize(bytesRead);
2178-
}
2179-
}
2180-
else if (compatibilityMode && _snapshot != null)
2180+
if (compatibilityMode && _snapshot != null)
21812181
{
21822182
// a partial read has happened so store the target buffer in the snapshot
21832183
// so it can be re-used when another packet arrives and we read again
21842184
SetSnapshotStorage(buff);
21852185
}
2186+
else if (canContinue && result == TdsOperationStatus.NeedMoreData)
2187+
{
2188+
SetSnapshotStorage(buff);
2189+
if (writeDataSizeToSnapshot)
2190+
{
2191+
SetSnapshotDataSize(bytesRead);
2192+
}
2193+
}
21862194
return result;
21872195
}
21882196
}
@@ -3454,17 +3462,17 @@ internal bool IsSnapshotContinuing()
34543462
_snapshotStatus == TdsParserStateObject.SnapshotStatus.ContinueRunning;
34553463
}
34563464

3457-
internal (bool IsAvailable, bool IsStarting, bool IsContinuing) GetSnapshotStatuses()
3465+
internal (bool CanContinue, bool IsStarting, bool IsContinuing) GetSnapshotStatuses()
34583466
{
3459-
bool isAvailable = _snapshot != null && _snapshot.ContinueEnabled;
3467+
bool canContinue = _snapshot != null && _snapshot.ContinueEnabled && _snapshotStatus != SnapshotStatus.NotActive;
34603468
bool isStarting = false;
34613469
bool isContinuing = false;
3462-
if (isAvailable)
3470+
if (canContinue)
34633471
{
34643472
isStarting = _snapshotStatus == SnapshotStatus.ReplayStarting;
34653473
isContinuing = _snapshotStatus == SnapshotStatus.ContinueRunning;
34663474
}
3467-
return (isAvailable, isStarting, isContinuing);
3475+
return (canContinue, isStarting, isContinuing);
34683476
}
34693477

34703478
internal int GetSnapshotStorageLength<T>()

src/Microsoft.Data.SqlClient/tests/ManualTests/SQL/DataReaderTest/DataReaderTest.cs

Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,8 @@
66
using System.Collections.Generic;
77
using System.Data;
88
using System.Data.SqlTypes;
9+
using System.Diagnostics;
10+
using System.Linq;
911
using System.Reflection;
1012
using System.Text;
1113
using System.Threading;
@@ -603,6 +605,60 @@ integrated into a comprehensive development
603605
}
604606
}
605607

608+
[ConditionalFact(typeof(DataTestUtility), nameof(DataTestUtility.AreConnStringsSetup))]
609+
public static async Task CanReadBinaryData()
610+
{
611+
const int Size = 20_000;
612+
613+
byte[] data = Enumerable.Range(0, Size)
614+
.Select(i => (byte)(i % 256))
615+
.ToArray();
616+
string tableName = DataTestUtility.GenerateObjectName();
617+
618+
using (var connection = new SqlConnection(DataTestUtility.TCPConnectionString))
619+
{
620+
await connection.OpenAsync();
621+
622+
try
623+
{
624+
using (var createCommand = connection.CreateCommand())
625+
{
626+
createCommand.CommandText = $@"
627+
DROP TABLE IF EXISTS [{tableName}]
628+
CREATE TABLE [{tableName}] (Id INT IDENTITY(1,1) PRIMARY KEY, Data VARBINARY(MAX));
629+
INSERT INTO [{tableName}] (Data) VALUES (@data);";
630+
createCommand.Parameters.Add(new SqlParameter("@data", SqlDbType.VarBinary, Size) { Value = data });
631+
await createCommand.ExecuteNonQueryAsync();
632+
}
633+
634+
using (var command = connection.CreateCommand())
635+
{
636+
637+
command.CommandText = $"SELECT Data FROM [{tableName}]";
638+
command.Parameters.Clear();
639+
var result = (byte[])await command.ExecuteScalarAsync();
640+
641+
Assert.Equal(data, result);
642+
}
643+
644+
}
645+
finally
646+
{
647+
try
648+
{
649+
using (var dropCommand = connection.CreateCommand())
650+
{
651+
dropCommand.CommandText = $"DROP TABLE IF EXISTS [{tableName}]";
652+
dropCommand.ExecuteNonQuery();
653+
}
654+
}
655+
catch
656+
{
657+
}
658+
}
659+
}
660+
}
661+
606662
// Synapse: Cannot find data type 'rowversion'.
607663
[ConditionalFact(typeof(DataTestUtility), nameof(DataTestUtility.AreConnStringsSetup), nameof(DataTestUtility.IsNotAzureSynapse))]
608664
public static void CheckLegacyNullRowVersionIsEmptyArray()

0 commit comments

Comments
 (0)