Skip to content

Commit 4bf9b3e

Browse files
Add functions to count object in use and objects in pool (#29)
* Add functions to count object in use and objects in pool * Define `keytype` and `valtype` for `Pool` * fixup! Define `keytype` and `valtype` for `Pool` * fixup! Add functions to count object in use and objects in pool * Bump version * Rename in terms of "use" * Rename `max` -> `limit` * Docs for keytype/valtype
1 parent c5cb574 commit 4bf9b3e

File tree

3 files changed

+161
-29
lines changed

3 files changed

+161
-29
lines changed

Project.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
name = "ConcurrentUtilities"
22
uuid = "f0e56b4a-5159-44fe-b623-3e5288b988bb"
33
authors = ["Jacob Quinn <quinn.jacobd@gmail.com>"]
4-
version = "2.2.1"
4+
version = "2.3.0"
55

66
[deps]
77
Serialization = "9e88b42a-f829-5b0c-bbe9-9e923198166b"

src/pools.jl

Lines changed: 66 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -4,53 +4,96 @@ export Pool, acquire, release, drain!
44
import Base: acquire, release
55

66
"""
7-
Pool{T}(max::Int=4096)
8-
Pool{K, T}(max::Int=4096)
7+
Pool{T}(limit::Int=4096)
8+
Pool{K, T}(limit::Int=4096)
99
1010
A threadsafe object for managing a pool of objects of type `T`, optionally keyed by objects
11-
of type `K`. Objects can be requested by calling `acquire(f, pool, [key])`, where `f` is a
11+
of type `K`.
12+
13+
Objects can be requested by calling `acquire(f, pool, [key])`, where `f` is a
1214
function that returns a new object of type `T`.
1315
The `key` argument is optional and can be used to lookup objects that match a certain criteria
14-
(a Dict is used internally, so matching is `isequal`).
16+
(a `Dict` is used internally, so matching is `isequal`).
1517
16-
The `max` argument will limit the number of objects
17-
that can be acquired at any given time. If the limit has been reached, `acquire` will
18-
block until an object is returned to the pool via `release`.
18+
The `limit` argument will limit the number of objects that can be in use at any given time.
19+
If the limit has been reached, `acquire` will block until an object is released
20+
via `release`.
1921
20-
By default, `release(pool, obj)` will return the object to the pool for reuse.
21-
`release(pool)` will return the "permit" to the pool while not returning
22-
any object for reuse.
22+
- `release(pool, obj)` will return the object to the pool for reuse.
23+
- `release(pool)` will decrement the number in use but not return any object for reuse.
24+
- `drain!` can be used to remove objects that have been returned to the pool for reuse;
25+
it does *not* release any objects that are in use.
2326
24-
`drain!` can be used to remove any cached objects for reuse, but it does *not* release
25-
any active acquires.
27+
See also `acquire`, `release`, `Pools.limit`, `Pools.in_use`, `Pools.in_pool`, `drain!`.
28+
The key and object types can be inspected with `keytype` and `valtype` respectively.
2629
"""
2730
mutable struct Pool{K, T}
2831
lock::Threads.Condition
29-
max::Int
32+
limit::Int
3033
cur::Int
3134
keyedvalues::Dict{K, Vector{T}}
3235
values::Vector{T}
3336

34-
function Pool{K, T}(max::Int=4096) where {K, T}
37+
function Pool{K, T}(limit::Int=4096) where {K, T}
3538
T === Nothing && throw(ArgumentError("Pool type can not be `Nothing`"))
36-
x = new(Threads.Condition(), max, 0)
39+
x = new(Threads.Condition(), limit, 0)
3740
if K === Nothing
3841
x.values = T[]
39-
safesizehint!(x.values, max)
42+
safesizehint!(x.values, limit)
4043
else
4144
x.keyedvalues = Dict{K, Vector{T}}()
4245
end
4346
return x
4447
end
4548
end
4649

47-
Pool{T}(max::Int=4096) where {T} = Pool{Nothing, T}(max)
50+
Pool{T}(limit::Int=4096) where {T} = Pool{Nothing, T}(limit)
4851

4952
safesizehint!(x, n) = sizehint!(x, min(4096, n))
5053

5154
# determines whether we'll look up object caches in .keyedvalues or .values
5255
iskeyed(::Pool{K}) where {K} = K !== Nothing
5356

57+
"""
58+
keytype(::Pool)
59+
60+
Return the type of the keys for the pool.
61+
If the pool is not keyed, this will return `Nothing`.
62+
"""
63+
Base.keytype(::Type{<:Pool{K}}) where {K} = K
64+
Base.keytype(p::Pool) = keytype(typeof(p))
65+
66+
"""
67+
valtype(::Pool)
68+
69+
Return the type of the objects that can be stored in the pool.
70+
"""
71+
Base.valtype(::Type{<:Pool{<:Any, T}}) where {T} = T
72+
Base.valtype(p::Pool) = valtype(typeof(p))
73+
74+
"""
75+
Pools.limit(pool::Pool) -> Int
76+
77+
Return the maximum number of objects permitted to be in use at the same time.
78+
See `Pools.in_use(pool)` for the number of objects currently in use.
79+
"""
80+
limit(pool::Pool) = Base.@lock pool.lock pool.limit
81+
82+
"""
83+
Pools.in_use(pool::Pool) -> Int
84+
85+
Return the number of objects currently in use. Less than or equal to `Pools.limit(pool)`.
86+
"""
87+
in_use(pool::Pool) = Base.@lock pool.lock pool.cur
88+
89+
"""
90+
Pools.in_pool(pool::Pool) -> Int
91+
92+
Return the number of objects in the pool available for reuse.
93+
"""
94+
in_pool(pool::Pool) = Base.@lock pool.lock mapreduce(length, +, values(pool.keyedvalues); init=0)
95+
in_pool(pool::Pool{Nothing}) = Base.@lock pool.lock length(pool.values)
96+
5497
"""
5598
drain!(pool)
5699
@@ -72,7 +115,7 @@ end
72115
TRUE(x) = true
73116

74117
@noinline keyerror(key, K) = throw(ArgumentError("invalid key `$key` provided for pool key type $K"))
75-
@noinline releaseerror() = throw(ArgumentError("cannot release permit when pool is empty"))
118+
@noinline releaseerror() = throw(ArgumentError("cannot release when no objects are in use"))
76119

77120
# NOTE: assumes you have the lock!
78121
function releasepermit(pool::Pool)
@@ -92,19 +135,19 @@ The `forcenew` keyword argument can be used to force the creation of a new objec
92135
The `isvalid` keyword argument can be used to specify a function that will be called to determine if an object is still valid
93136
for reuse. By default, all objects are considered valid.
94137
If there are no objects available for reuse, `f` will be called to create a new object.
95-
If the pool is already at its maximum capacity, `acquire` will block until an object is returned to the pool via `release`.
138+
If the pool is already at its usage limit, `acquire` will block until an object is returned to the pool via `release`.
96139
"""
97140
function Base.acquire(f, pool::Pool{K, T}, key=nothing; forcenew::Bool=false, isvalid::Function=TRUE) where {K, T}
98141
key isa K || keyerror(key, K)
99142
Base.@lock pool.lock begin
100143
# first get a permit
101-
while pool.cur >= pool.max
144+
while pool.cur >= pool.limit
102145
wait(pool.lock)
103146
end
104147
pool.cur += 1
105148
# now see if we can get an object from the pool for reuse
106149
if !forcenew
107-
objs = iskeyed(pool) ? get!(() -> safesizehint!(T[], pool.max), pool.keyedvalues, key) : pool.values
150+
objs = iskeyed(pool) ? get!(() -> safesizehint!(T[], pool.limit), pool.keyedvalues, key) : pool.values
108151
while !isempty(objs)
109152
obj = pop!(objs)
110153
isvalid(obj) && return obj
@@ -126,10 +169,10 @@ end
126169
release(pool::Pool{K, T}, obj::T)
127170
release(pool::Pool{K, T})
128171
129-
Return an object to a `pool`, optionally keyed by the provided `key`.
172+
Release an object from usage by a `pool`, optionally keyed by the provided `key`.
130173
If `obj` is provided, it will be returned to the pool for reuse.
131174
Otherwise, if `nothing` is returned, or `release(pool)` is called,
132-
just the "permit" will be returned to the pool.
175+
the usage count will be decremented without an object being returned to the pool for reuse.
133176
"""
134177
function Base.release(pool::Pool{K, T}, key, obj::Union{T, Nothing}=nothing) where {K, T}
135178
key isa K || keyerror(key, K)

test/pools.jl

Lines changed: 94 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,24 +1,49 @@
1-
using ConcurrentUtilities, Test
1+
using ConcurrentUtilities.Pools, Test
22

33
@testset "Pools" begin
4+
pool_size = lengthPools.values
45
@testset "nonkeyed and pool basics" begin
56
pool = Pool{Int}(3)
7+
@test keytype(pool) === Nothing
8+
@test valtype(pool) === Int
9+
10+
@test Pools.limit(pool) == 3
11+
@test Pools.in_use(pool) == 0
12+
@test Pools.in_pool(pool) == 0
13+
614
# acquire an object from the pool
715
x1 = acquire(() -> 1, pool)
816
# no existing objects in the pool, so our function was called to create a new one
917
@test x1 == 1
18+
@test Pools.limit(pool) == 3
19+
@test Pools.in_use(pool) == 1
20+
@test Pools.in_pool(pool) == 0
21+
1022
# release back to the pool for reuse
1123
release(pool, x1)
24+
@test Pools.in_use(pool) == 0
25+
@test Pools.in_pool(pool) == 1
26+
1227
# acquire another object from the pool
1328
x1 = acquire(() -> 2, pool)
1429
# this time, the pool had an existing object, so our function was not called
1530
@test x1 == 1
31+
@test Pools.in_use(pool) == 1
32+
@test Pools.in_pool(pool) == 0
33+
1634
# but now there are no objects to reuse again, so the next acquire will call our function
1735
x2 = acquire(() -> 2, pool)
1836
@test x2 == 2
37+
@test Pools.in_use(pool) == 2
38+
@test Pools.in_pool(pool) == 0
39+
1940
x3 = acquire(() -> 3, pool)
2041
@test x3 == 3
21-
# the pool is now at capacity, so the next acquire will block until an object is released
42+
@test Pools.in_use(pool) == 3
43+
@test Pools.in_pool(pool) == 0
44+
45+
# the pool is now at `Pools.limit`, so the next acquire will block until an object is released
46+
@test Pools.in_use(pool) == Pools.limit(pool)
2247
tsk = @async acquire(() -> 4, pool; forcenew=true)
2348
yield()
2449
@test !istaskdone(tsk)
@@ -28,60 +53,110 @@ using ConcurrentUtilities, Test
2853
x1 = fetch(tsk)
2954
# even though we released 1 for reuse, we passed forcenew, so our function was called to create new
3055
@test x1 == 4
56+
@test Pools.in_use(pool) == 3
57+
@test Pools.in_pool(pool) == 1
58+
3159
# error to try and provide a key to a non-keyed pool
3260
@test_throws ArgumentError acquire(() -> 1, pool, 1)
61+
3362
# release objects back to the pool
3463
release(pool, x1)
3564
release(pool, x2)
3665
release(pool, x3)
66+
@test Pools.in_use(pool) == 0
67+
@test Pools.in_pool(pool) == 4
68+
3769
# acquire an object, but checking isvalid
3870
x1 = acquire(() -> 5, pool; isvalid=x -> x == 1)
3971
@test x1 == 1
72+
@test Pools.in_use(pool) == 1
73+
4074
# no valid objects, so our function was called to create a new one
4175
x2 = acquire(() -> 6, pool; isvalid=x -> x == 1)
4276
@test x2 == 6
43-
# we have one slot left in the pool, we now throw while creating new
77+
@test Pools.in_use(pool) == 2
78+
79+
# we have one permit left, we now throw while creating a new object
4480
# and we want to test that the permit isn't permanently lost for the pool
4581
@test_throws ErrorException acquire(() -> error("oops"), pool; forcenew=true)
82+
@test Pools.in_use(pool) == 2
83+
4684
# we can still acquire a new object
4785
x3 = acquire(() -> 7, pool; forcenew=true)
4886
@test x3 == 7
87+
@test Pools.in_use(pool) == 3
88+
4989
# release objects back to the pool
90+
drain!(pool)
5091
release(pool, x1)
5192
release(pool, x2)
5293
release(pool, x3)
94+
@test Pools.in_use(pool) == 0
95+
@test Pools.in_pool(pool) == 3
96+
5397
# try to do an invalid release
5498
@test_throws ArgumentError release(pool, 10)
99+
55100
# test that the invalid release didn't push the object to our pool for reuse
56101
x1 = acquire(() -> 8, pool)
57102
@test x1 == 7
103+
@test Pools.in_use(pool) == 1
104+
@test Pools.in_pool(pool) == 2
58105
# calling drain! removes all objects for reuse
59106
drain!(pool)
107+
@test Pools.in_use(pool) == 1
108+
@test Pools.in_pool(pool) == 0
109+
60110
x2 = acquire(() -> 9, pool)
61111
@test x2 == 9
112+
@test Pools.in_use(pool) == 2
113+
@test Pools.in_pool(pool) == 0
62114
end
63115

64116
@testset "keyed pool" begin
65117
# now test a keyed pool
66118
pool = Pool{String, Int}(3)
119+
@test keytype(pool) === String
120+
@test valtype(pool) === Int
121+
122+
@test Pools.limit(pool) == 3
123+
@test Pools.in_use(pool) == 0
124+
@test Pools.in_pool(pool) == 0
125+
67126
# acquire an object from the pool
68127
x1 = acquire(() -> 1, pool, "a")
69128
# no existing objects in the pool, so our function was called to create a new one
70129
@test x1 == 1
130+
@test Pools.in_use(pool) == 1
131+
@test Pools.in_pool(pool) == 0
132+
71133
# release back to the pool for reuse
72134
release(pool, "a", x1)
135+
@test Pools.in_use(pool) == 0
136+
@test Pools.in_pool(pool) == 1
137+
73138
# test for a different key
74139
x2 = acquire(() -> 2, pool, "b")
75140
# there's an existing object, but for a different key, so we don't reuse
76141
@test x2 == 2
142+
@test Pools.in_use(pool) == 1
143+
@test Pools.in_pool(pool) == 1
144+
77145
# acquire another object from the pool
78146
x1 = acquire(() -> 2, pool, "a")
79147
# this time, the pool had an existing object, so our function was not called
80148
@test x1 == 1
149+
@test Pools.in_use(pool) == 2
150+
@test Pools.in_pool(pool) == 0
151+
81152
x3 = acquire(() -> 3, pool, "a")
82153
@test x3 == 3
154+
@test Pools.in_use(pool) == 3
155+
@test Pools.in_pool(pool) == 0
156+
83157
# the pool is now at capacity, so the next acquire will block until an object is released
84158
# even though we've acquired using different keys, the capacity is shared across the pool
159+
@test Pools.in_use(pool) == Pools.limit(pool)
85160
tsk = @async acquire(() -> 4, pool, "c"; forcenew=true)
86161
yield()
87162
@test !istaskdone(tsk)
@@ -91,13 +166,27 @@ using ConcurrentUtilities, Test
91166
x1 = fetch(tsk)
92167
# even though we released 1 for reuse, we passed forcenew, so our function was called to create new
93168
@test x1 == 4
169+
@test Pools.in_use(pool) == 3
170+
@test Pools.in_pool(pool) == 1
171+
94172
# error to try and provide an invalid key to a keyed pool
95173
@test_throws ArgumentError acquire(() -> 1, pool, 1)
96-
# error to release an invalid key back to the pool
97-
@test_throws KeyError release(pool, "z", 1)
174+
@test Pools.in_use(pool) == 3
175+
@test Pools.in_pool(pool) == 1
176+
98177
# error to *not* provide a key to a keyed pool
99178
@test_throws ArgumentError acquire(() -> 1, pool)
179+
@test Pools.in_use(pool) == 3
180+
@test Pools.in_pool(pool) == 1
181+
100182
# error to *not* provide a key when releasing to a keyed pool
101183
@test_throws ArgumentError release(pool)
184+
@test Pools.in_use(pool) == 3
185+
@test Pools.in_pool(pool) == 1
186+
187+
# error to release an invalid key back to the pool
188+
@test_throws KeyError release(pool, "z", 1)
189+
@test_broken Pools.in_use(pool) == 3
190+
@test Pools.in_pool(pool) == 1
102191
end
103192
end

0 commit comments

Comments
 (0)