Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 26 additions & 9 deletions src/Microsoft.Azure.EventHubs.Processor/AzureBlobLease.cs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@

namespace Microsoft.Azure.EventHubs.Processor
{
using System;
using System.Threading.Tasks;
using Newtonsoft.Json;
using WindowsAzure.Storage.Blob;
Expand All @@ -11,48 +12,64 @@ class AzureBlobLease : Lease
{
readonly bool isOwned;

readonly TimeSpan LeaseDuration;

// ctor needed for deserialization
internal AzureBlobLease()
{
}

internal AzureBlobLease(string partitionId, CloudBlockBlob blob) : base(partitionId)
internal AzureBlobLease(string partitionId, CloudBlockBlob blob, TimeSpan LeaseDuration) : base(partitionId)
{
this.Blob = blob;
this.isOwned = blob.Properties.LeaseState == LeaseState.Leased;
this.LeaseDuration = LeaseDuration;
}

internal AzureBlobLease(string partitionId, string owner, CloudBlockBlob blob) : base(partitionId)
internal AzureBlobLease(string partitionId, string owner, CloudBlockBlob blob, TimeSpan LeaseDuration) : base(partitionId)
{
this.Blob = blob;
this.Owner = owner;
this.isOwned = blob.Properties.LeaseState == LeaseState.Leased;
this.LeaseDuration = LeaseDuration;
}

internal AzureBlobLease(AzureBlobLease source)
internal AzureBlobLease(AzureBlobLease source, TimeSpan LeaseDuration)
: base(source)
{
this.Offset = source.Offset;
this.SequenceNumber = source.SequenceNumber;
this.Blob = source.Blob;
this.isOwned = source.isOwned;
this.LeaseDuration = LeaseDuration;
}

internal AzureBlobLease(AzureBlobLease source, CloudBlockBlob blob) : base(source)
internal AzureBlobLease(AzureBlobLease source, CloudBlockBlob blob, TimeSpan LeaseDuration) : base(source)
{
this.Offset = source.Offset;
this.SequenceNumber = source.SequenceNumber;
this.Blob = blob;
this.isOwned = blob.Properties.LeaseState == LeaseState.Leased;
}
this.LeaseDuration = LeaseDuration;
}

// do not serialize
[JsonIgnore]
public CloudBlockBlob Blob { get; }

public override Task<bool> IsExpired()
{
public override Task<bool> IsExpired()
{
if (this.LeaseDuration > TimeSpan.FromSeconds(60))
{
DateTime lastModifiedTime = this.Blob.Properties.LastModified.Value.DateTime;
if (DateTime.UtcNow - lastModifiedTime > this.LeaseDuration)
{
return Task.FromResult<bool>(true);
}
return Task.FromResult<bool>(false);
}
return Task.FromResult(!this.isOwned);
}
}
}

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,7 @@ public Task<Checkpoint> CreateCheckpointIfNotExistsAsync(string partitionId)

public async Task UpdateCheckpointAsync(Lease lease, Checkpoint checkpoint)
{
AzureBlobLease newLease = new AzureBlobLease((AzureBlobLease) lease)
AzureBlobLease newLease = new AzureBlobLease((AzureBlobLease) lease,LeaseDuration)
{
Offset = checkpoint.Offset,
SequenceNumber = checkpoint.SequenceNumber
Expand All @@ -160,6 +160,9 @@ public Task DeleteCheckpointAsync(string partitionId)

public TimeSpan LeaseDuration => this.leaseDuration;

//This flag will be used in this class while renewing/acquiring new lease.
public bool IsInfiniteLease => this.LeaseDuration > TimeSpan.FromSeconds(60);

public Task<bool> LeaseStoreExistsAsync()
{
return this.eventHubContainer.ExistsAsync(null, this.operationContext);
Expand Down Expand Up @@ -255,7 +258,7 @@ public async Task<IEnumerable<Lease>> GetAllLeasesAsync()
// Discover partition id from URI path of the blob.
var partitionId = leaseBlob.Uri.AbsolutePath.Split('/').Last();

leaseList.Add(new AzureBlobLease(partitionId, owner, leaseBlob));
leaseList.Add(new AzureBlobLease(partitionId, owner, leaseBlob,LeaseDuration));
}

continuationToken = leaseBlobsResult.ContinuationToken;
Expand All @@ -271,7 +274,7 @@ public async Task<Lease> CreateLeaseIfNotExistsAsync(string partitionId) // thro
try
{
CloudBlockBlob leaseBlob = GetBlockBlobReference(partitionId);
returnLease = new AzureBlobLease(partitionId, leaseBlob);
returnLease = new AzureBlobLease(partitionId, leaseBlob,LeaseDuration);
string jsonLease = JsonConvert.SerializeObject(returnLease);

ProcessorEventSource.Log.AzureStorageManagerInfo(
Expand Down Expand Up @@ -348,18 +351,28 @@ async Task<bool> AcquireLeaseCoreAsync(AzureBlobLease lease)
}

ProcessorEventSource.Log.AzureStorageManagerInfo(this.host.HostName, lease.PartitionId, "Need to ChangeLease");
renewLease = true;
newToken = await leaseBlob.ChangeLeaseAsync(
newLeaseId,
AccessCondition.GenerateLeaseCondition(lease.Token),
null,
this.operationContext).ConfigureAwait(false);
if (IsInfiniteLease)
{
//Incase of inifinite lease. we need to break the lease and acquire it again.
TimeSpan? breakReleaseTime = TimeSpan.FromSeconds(3);
await leaseBlob.BreakLeaseAsync(breakReleaseTime);
newToken = await leaseBlob.AcquireLeaseAsync(this.GetEffectiveLeaseDurationForBlob() /* infinite lease */, newLeaseId);
}
else
{
renewLease = true;
newToken = await leaseBlob.ChangeLeaseAsync(
newLeaseId,
AccessCondition.GenerateLeaseCondition(lease.Token),
null,
this.operationContext).ConfigureAwait(false);
}
}
else
{
ProcessorEventSource.Log.AzureStorageManagerInfo(this.host.HostName, lease.PartitionId, "Need to AcquireLease");
newToken = await leaseBlob.AcquireLeaseAsync(
leaseDuration,
this.GetEffectiveLeaseDurationForBlob(),
newLeaseId,
null,
null,
Expand All @@ -374,7 +387,7 @@ async Task<bool> AcquireLeaseCoreAsync(AzureBlobLease lease)
// ChangeLease doesn't renew so we should avoid lease expiring before next renew interval.
if (renewLease)
{
await this.RenewLeaseCoreAsync(lease).ConfigureAwait(false);
await this.RenewLeaseAsync(lease).ConfigureAwait(false);
}

await leaseBlob.UploadTextAsync(
Expand All @@ -401,6 +414,11 @@ await lease.Blob.SetMetadataAsync(

public Task<bool> RenewLeaseAsync(Lease lease)
{
//If Lease is infinite we don't need to renew it.
if (IsInfiniteLease)
{
return Task.FromResult<bool>(true);
}
return RenewLeaseCoreAsync((AzureBlobLease)lease);
}

Expand Down Expand Up @@ -439,7 +457,7 @@ async Task<bool> ReleaseLeaseCoreAsync(AzureBlobLease lease)
try
{
string leaseId = lease.Token;
AzureBlobLease releasedCopy = new AzureBlobLease(lease)
AzureBlobLease releasedCopy = new AzureBlobLease(lease, LeaseDuration)
{
Token = string.Empty,
Owner = string.Empty
Expand Down Expand Up @@ -518,7 +536,7 @@ async Task<Lease> DownloadLeaseAsync(string partitionId, CloudBlockBlob blob) //

ProcessorEventSource.Log.AzureStorageManagerInfo(this.host.HostName, partitionId, "Raw JSON downloaded: " + jsonLease);
AzureBlobLease rehydrated = (AzureBlobLease)JsonConvert.DeserializeObject(jsonLease, typeof(AzureBlobLease));
AzureBlobLease blobLease = new AzureBlobLease(rehydrated, blob);
AzureBlobLease blobLease = new AzureBlobLease(rehydrated, blob, LeaseDuration);
return blobLease;
}

Expand Down Expand Up @@ -549,5 +567,15 @@ CloudBlockBlob GetBlockBlobReference(string partitionId)
{
return this.consumerGroupDirectory.GetBlockBlobReference(partitionId);
}

//This function will be used to get the lease duration which will be passed to azure storage library.
//If lease is infinite than while acquiring we need to pass null.
TimeSpan? GetEffectiveLeaseDurationForBlob()
{
TimeSpan? effectiveLeaseDuration = null;
if (!IsInfiniteLease)
effectiveLeaseDuration = this.LeaseDuration;
return effectiveLeaseDuration;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ namespace Microsoft.Azure.EventHubs.Processor
public class PartitionManagerOptions
{
const int MinLeaseDurationInSeconds = 15;
const int MaxLeaseDurationInSeconds = 60;
const int MaxLeaseDurationInSeconds = 1500;

TimeSpan renewInterval = TimeSpan.FromSeconds(10);
TimeSpan leaseDuration = TimeSpan.FromSeconds(30);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ void InvalidPartitionManagerOptions()
Assert.ThrowsAsync<ArgumentException>(() =>
{
TestUtility.Log("Setting lease duration outside of allowed range should fail.");
pmo.LeaseDuration = TimeSpan.FromSeconds(65);
pmo.LeaseDuration = TimeSpan.FromSeconds(1565);
throw new InvalidOperationException("Setting LeaseDuration should have failed.");
}).Wait();
}
Expand Down