Skip to content

Commit

Permalink
support for infinity in sorted set commands params & results (#981)
Browse files Browse the repository at this point in the history
  • Loading branch information
maizy committed Jul 30, 2024
1 parent 637377d commit d62e461
Show file tree
Hide file tree
Showing 6 changed files with 198 additions and 31 deletions.
100 changes: 95 additions & 5 deletions modules/redis-it/src/test/scala/zio/redis/SortedSetsSpec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,21 @@ trait SortedSetsSpec extends IntegrationSpec {
result <- redis.bzPopMax(duration, key1, key2, key3).returning[String]
} yield assert(result)(isSome(equalTo((key1, tokyo))))
),
test("infinity score in set")(
for {
redis <- ZIO.service[Redis]
key1 <- uuid
key2 <- uuid
duration = Duration.fromMillis(1000)
delhi = MemberScore("Delhi", 1d)
london = MemberScore("London", 3d)
tokyo = MemberScore("Tokyo", 5d)
edge = MemberScore("The edge of universe", Double.PositiveInfinity)
_ <- redis.zAdd(key1)(delhi, edge)
_ <- redis.zAdd(key2)(london, tokyo)
result <- redis.bzPopMax(duration, key1, key2).returning[String]
} yield assert(result)(isSome(equalTo((key1, edge))))
),
test("empty set")(
for {
redis <- ZIO.service[Redis]
Expand All @@ -53,6 +68,21 @@ trait SortedSetsSpec extends IntegrationSpec {
result <- redis.bzPopMin(duration, key1, key2, key3).returning[String]
} yield assert(result)(isSome(equalTo((key2, delhi))))
),
test("negative infinity score in set")(
for {
redis <- ZIO.service[Redis]
key1 <- uuid
key2 <- uuid
duration = Duration.fromMillis(1000)
delhi = MemberScore("Delhi", 1d)
london = MemberScore("London", 3d)
paris = MemberScore("Paris", 4d)
quark = MemberScore("Quark", Double.NegativeInfinity)
_ <- redis.zAdd(key1)(delhi, quark)
_ <- redis.zAdd(key2)(london, paris)
result <- redis.bzPopMin(duration, key1, key2).returning[String]
} yield assert(result)(isSome(equalTo((key1, quark))))
),
test("empty set")(
for {
redis <- ZIO.service[Redis]
Expand Down Expand Up @@ -94,7 +124,18 @@ trait SortedSetsSpec extends IntegrationSpec {
for {
redis <- ZIO.service[Redis]
key <- uuid
added <- redis.zAdd(key)(MemberScore("a", 1d), MemberScore("b", 2d), MemberScore("c", 3d))
added <- redis.zAdd(key)(MemberScore("a", 1d), MemberScore("b", 3.1415e50), MemberScore("c", 3d))
} yield assert(added)(equalTo(3L))
},
test("multiple elements with negative & positive infinity") {
for {
redis <- ZIO.service[Redis]
key <- uuid
added <- redis.zAdd(key)(
MemberScore("neg infinity", Double.NegativeInfinity),
MemberScore("a", 1d),
MemberScore("pos infinity", Double.PositiveInfinity)
)
} yield assert(added)(equalTo(3L))
},
test("error when not set") {
Expand Down Expand Up @@ -840,9 +881,15 @@ trait SortedSetsSpec extends IntegrationSpec {
london = MemberScore("London", 3d)
paris = MemberScore("Paris", 4d)
tokyo = MemberScore("Tokyo", 5d)
_ <- redis.zAdd(key)(delhi, mumbai, london, tokyo, paris)
edge = MemberScore("The edge of universe", Double.PositiveInfinity)
quark = MemberScore("Quark", Double.NegativeInfinity)
_ <- redis.zAdd(key)(edge, delhi, mumbai, london, tokyo, paris, quark)
result <- redis.zRange(key, 0 to -1).returning[String]
} yield assert(result.toList)(equalTo(List("Delhi", "Mumbai", "London", "Paris", "Tokyo")))
} yield assert(result.toList)(
equalTo(
List("Quark", "Delhi", "Mumbai", "London", "Paris", "Tokyo", "The edge of universe")
)
)
},
test("empty set") {
for {
Expand All @@ -862,10 +909,12 @@ trait SortedSetsSpec extends IntegrationSpec {
london = MemberScore("London", 3d)
paris = MemberScore("Paris", 4d)
tokyo = MemberScore("Tokyo", 5d)
_ <- redis.zAdd(key)(delhi, mumbai, london, tokyo, paris)
edge = MemberScore("The edge of universe", Double.PositiveInfinity)
quark = MemberScore("Quark", Double.NegativeInfinity)
_ <- redis.zAdd(key)(edge, delhi, mumbai, quark, london, tokyo, paris)
result <- redis.zRangeWithScores(key, 0 to -1).returning[String]
} yield assert(result.toList)(
equalTo(List(delhi, mumbai, london, paris, tokyo))
equalTo(List(quark, delhi, mumbai, london, paris, tokyo, edge))
)
},
test("empty set") {
Expand Down Expand Up @@ -1398,6 +1447,18 @@ trait SortedSetsSpec extends IntegrationSpec {
members <- scanAll(key)
} yield assert(members)(equalTo(Chunk(a, b, c)))
},
test("with infinity in set") {
for {
redis <- ZIO.service[Redis]
key <- uuid
a = MemberScore("a", 1d)
b = MemberScore("b", 2d)
inf = MemberScore("inf", Double.PositiveInfinity)
negInf = MemberScore("neg inf", Double.NegativeInfinity)
_ <- redis.zAdd(key)(a, b, inf, negInf)
members <- scanAll(key)
} yield assert(members)(equalTo(Chunk(negInf, a, b, inf)))
},
test("empty set") {
for {
redis <- ZIO.service[Redis]
Expand Down Expand Up @@ -1470,6 +1531,14 @@ trait SortedSetsSpec extends IntegrationSpec {
result <- redis.zScore(key, "Delhi")
} yield assert(result)(isSome(equalTo(10.0)))
},
test("infinity score in set") {
for {
redis <- ZIO.service[Redis]
key <- uuid
_ <- redis.zAdd(key)(MemberScore("Delhi", 10d), MemberScore("Infinity", Double.PositiveInfinity))
result <- redis.zScore(key, "Infinity")
} yield assert(result)(isSome(equalTo(Double.PositiveInfinity)))
},
test("empty set") {
for {
redis <- ZIO.service[Redis]
Expand Down Expand Up @@ -1499,6 +1568,27 @@ trait SortedSetsSpec extends IntegrationSpec {
key <- uuid
result <- redis.zMScore(key, "Hyderabad")
} yield assert(result)(equalTo(Chunk(None)))
},
test("infinity score") {
for {
redis <- ZIO.service[Redis]
key <- uuid
_ <- redis.zAdd(key)(
MemberScore("Delhi", 10d),
MemberScore("Infinity", Double.PositiveInfinity),
MemberScore("-Infinity", Double.NegativeInfinity)
)
result <- redis.zMScore(key, "Infinity", "-Infinity", "Delhi", "Ankh-Morpork")
} yield assert(result)(
equalTo(
Chunk(
Some(Double.PositiveInfinity),
Some(Double.NegativeInfinity),
Some(10d),
None
)
)
)
}
),
suite("zUnion")(
Expand Down
10 changes: 8 additions & 2 deletions modules/redis/src/main/scala/zio/redis/Input.scala
Original file line number Diff line number Diff line change
Expand Up @@ -337,8 +337,14 @@ object Input {
}

final case class MemberScoreInput[M: BinaryCodec]() extends Input[MemberScore[M]] {
def encode(data: MemberScore[M]): RespCommand =
RespCommand(RespCommandArgument.Value(data.score.toString), RespCommandArgument.Value(data.member))
def encode(data: MemberScore[M]): RespCommand = {
val score = data.score match {
case Double.NegativeInfinity => "-inf"
case Double.PositiveInfinity => "+inf"
case d: Double => d.toString.toLowerCase
}
RespCommand(RespCommandArgument.Value(score), RespCommandArgument.Value(data.member))
}
}

case object NoAckInput extends Input[NoAck] {
Expand Down
24 changes: 20 additions & 4 deletions modules/redis/src/main/scala/zio/redis/Output.scala
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,14 @@ object Output {
}
}

case object DoubleOrInfinity extends Output[Double] {
protected def tryDecode(respValue: RespValue): Double =
respValue match {
case RespValue.BulkString(bytes) => decodeDouble(bytes, withInfinity = true)
case other => throw ProtocolError(s"$other isn't a double or an infinity.")
}
}

private object DurationOutput extends Output[Long] {
protected def tryDecode(respValue: RespValue): Long =
respValue match {
Expand Down Expand Up @@ -729,11 +737,19 @@ object Output {
}
}

private def decodeDouble(bytes: Chunk[Byte]): Double = {
private def decodeDouble(bytes: Chunk[Byte], withInfinity: Boolean = false): Double = {
val text = new String(bytes.toArray, StandardCharsets.UTF_8)
try text.toDouble
catch {
case _: NumberFormatException => throw ProtocolError(s"'$text' isn't a double.")
text match {
case "inf" if withInfinity => Double.PositiveInfinity
case "-inf" if withInfinity => Double.NegativeInfinity
case _ =>
try text.toDouble
catch {
case _: NumberFormatException =>
throw ProtocolError(
if (withInfinity) s"'$text' isn't a double or an infinity." else s"'$text' isn't a double."
)
}
}
}

Expand Down
44 changes: 24 additions & 20 deletions modules/redis/src/main/scala/zio/redis/api/SortedSets.scala
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,8 @@ trait SortedSets[G[+_]] extends RedisEnvironment[G] {
* @return
* A three-element Chunk with the first element being the name of the key where a member was popped, the second
* element is the popped member itself, and the third element is the score of the popped element. An empty chunk is
* returned when no element could be popped and the timeout expired.
* returned when no element could be popped and the timeout expired. Double.PositiveInfinity and
* Double.NegativeInfinity are valid scores as well.
*/
final def bzPopMax[K: Schema](
timeout: Duration,
Expand All @@ -49,7 +50,7 @@ trait SortedSets[G[+_]] extends RedisEnvironment[G] {
new ResultBuilder1[({ type lambda[x] = Option[(K, MemberScore[x])] })#lambda, G] {
def returning[M: Schema]: G[Option[(K, MemberScore[M])]] = {
val memberScoreOutput =
Tuple3Output(ArbitraryOutput[K](), ArbitraryOutput[M](), DoubleOutput).map { case (k, m, s) =>
Tuple3Output(ArbitraryOutput[K](), ArbitraryOutput[M](), DoubleOrInfinity).map { case (k, m, s) =>
(k, MemberScore(m, s))
}

Expand All @@ -76,7 +77,8 @@ trait SortedSets[G[+_]] extends RedisEnvironment[G] {
* @return
* A three-element Chunk with the first element being the name of the key where a member was popped, the second
* element is the popped member itself, and the third element is the score of the popped element. An empty chunk is
* returned when no element could be popped and the timeout expired.
* returned when no element could be popped and the timeout expired. Double.PositiveInfinity and
* Double.NegativeInfinity are valid scores as well.
*/
final def bzPopMin[K: Schema](
timeout: Duration,
Expand All @@ -86,7 +88,7 @@ trait SortedSets[G[+_]] extends RedisEnvironment[G] {
new ResultBuilder1[({ type lambda[x] = Option[(K, MemberScore[x])] })#lambda, G] {
def returning[M: Schema]: G[Option[(K, MemberScore[M])]] = {
val memberScoreOutput =
Tuple3Output(ArbitraryOutput[K](), ArbitraryOutput[M](), DoubleOutput).map { case (k, m, s) =>
Tuple3Output(ArbitraryOutput[K](), ArbitraryOutput[M](), DoubleOrInfinity).map { case (k, m, s) =>
(k, MemberScore(m, s))
}

Expand Down Expand Up @@ -248,7 +250,7 @@ trait SortedSets[G[+_]] extends RedisEnvironment[G] {
NonEmptyList(ArbitraryKeyInput[K]()),
WithScoresInput
),
ChunkTuple2Output(ArbitraryOutput[M](), DoubleOutput)
ChunkTuple2Output(ArbitraryOutput[M](), DoubleOrInfinity)
.map(_.map { case (m, s) => MemberScore(m, s) })
)
command.run((keys.size + 1, (key, keys.toList), WithScores))
Expand Down Expand Up @@ -366,7 +368,7 @@ trait SortedSets[G[+_]] extends RedisEnvironment[G] {
OptionalInput(WeightsInput),
WithScoresInput
),
ChunkTuple2Output(ArbitraryOutput[M](), DoubleOutput)
ChunkTuple2Output(ArbitraryOutput[M](), DoubleOrInfinity)
.map(_.map { case (m, s) => MemberScore(m, s) })
)
command.run((keys.size + 1, (key, keys.toList), aggregate, weights, WithScores))
Expand Down Expand Up @@ -437,10 +439,11 @@ trait SortedSets[G[+_]] extends RedisEnvironment[G] {
* Keys of the rest sets
* @return
* List of scores or None associated with the specified member values (a double precision floating point number).
* Double.PositiveInfinity and Double.NegativeInfinity are valid scores as well.
*/
final def zMScore[K: Schema](key: K, keys: K*): G[Chunk[Option[Double]]] = {
val command =
RedisCommand(ZMScore, NonEmptyList(ArbitraryKeyInput[K]()), ChunkOutput(OptionalOutput(DoubleOutput)))
RedisCommand(ZMScore, NonEmptyList(ArbitraryKeyInput[K]()), ChunkOutput(OptionalOutput(DoubleOrInfinity)))
command.run((key, keys.toList))
}

Expand All @@ -462,7 +465,7 @@ trait SortedSets[G[+_]] extends RedisEnvironment[G] {
val command = RedisCommand(
ZPopMax,
Tuple2(ArbitraryKeyInput[K](), OptionalInput(LongInput)),
ChunkTuple2Output(ArbitraryOutput[M](), DoubleOutput)
ChunkTuple2Output(ArbitraryOutput[M](), DoubleOrInfinity)
.map(_.map { case (m, s) => MemberScore(m, s) })
)
command.run((key, count))
Expand All @@ -487,7 +490,7 @@ trait SortedSets[G[+_]] extends RedisEnvironment[G] {
val command = RedisCommand(
ZPopMin,
Tuple2(ArbitraryKeyInput[K](), OptionalInput(LongInput)),
ChunkTuple2Output(ArbitraryOutput[M](), DoubleOutput)
ChunkTuple2Output(ArbitraryOutput[M](), DoubleOrInfinity)
.map(_.map { case (m, s) => MemberScore(m, s) })
)
command.run((key, count))
Expand Down Expand Up @@ -550,7 +553,7 @@ trait SortedSets[G[+_]] extends RedisEnvironment[G] {
val command = RedisCommand(
ZRandMember,
Tuple3(ArbitraryKeyInput[K](), LongInput, WithScoresInput),
ZRandMemberTuple2Output(ArbitraryOutput[M](), DoubleOutput)
ZRandMemberTuple2Output(ArbitraryOutput[M](), DoubleOrInfinity)
.map(_.map { case (m, s) => MemberScore(m, s) })
)

Expand Down Expand Up @@ -593,7 +596,7 @@ trait SortedSets[G[+_]] extends RedisEnvironment[G] {
val command = RedisCommand(
ZRange,
Tuple3(ArbitraryKeyInput[K](), RangeInput, WithScoresInput),
ChunkTuple2Output(ArbitraryOutput[M](), DoubleOutput)
ChunkTuple2Output(ArbitraryOutput[M](), DoubleOrInfinity)
.map(_.map { case (m, s) => MemberScore(m, s) })
)
command.run((key, range, WithScores))
Expand Down Expand Up @@ -693,7 +696,7 @@ trait SortedSets[G[+_]] extends RedisEnvironment[G] {
WithScoresInput,
OptionalInput(LimitInput)
),
ChunkTuple2Output(ArbitraryOutput[M](), DoubleOutput)
ChunkTuple2Output(ArbitraryOutput[M](), DoubleOrInfinity)
.map(_.map { case (m, s) => MemberScore(m, s) })
)
command.run((key, scoreRange.min.asString, scoreRange.max.asString, WithScores, limit))
Expand Down Expand Up @@ -735,7 +738,7 @@ trait SortedSets[G[+_]] extends RedisEnvironment[G] {
RedisCommand(
ZRank,
Tuple3(ArbitraryKeyInput[K](), ArbitraryValueInput[M](), WithScoreInput),
OptionalOutput(Tuple2Output(LongOutput, DoubleOutput).map { case (r, s) => RankScore(r, s) })
OptionalOutput(Tuple2Output(LongOutput, DoubleOrInfinity).map { case (r, s) => RankScore(r, s) })
)
command.run((key, member, WithScore))
}
Expand Down Expand Up @@ -849,7 +852,7 @@ trait SortedSets[G[+_]] extends RedisEnvironment[G] {
val command = RedisCommand(
ZRevRange,
Tuple3(ArbitraryKeyInput[K](), RangeInput, WithScoresInput),
ChunkTuple2Output(ArbitraryOutput[M](), DoubleOutput)
ChunkTuple2Output(ArbitraryOutput[M](), DoubleOrInfinity)
.map(_.map { case (m, s) => MemberScore(m, s) })
)
command.run((key, range, WithScores))
Expand Down Expand Up @@ -953,7 +956,7 @@ trait SortedSets[G[+_]] extends RedisEnvironment[G] {
WithScoresInput,
OptionalInput(LimitInput)
),
ChunkTuple2Output(ArbitraryOutput[M](), DoubleOutput)
ChunkTuple2Output(ArbitraryOutput[M](), DoubleOrInfinity)
.map(_.map { case (m, s) => MemberScore(m, s) })
)
command.run((key, scoreRange.max.asString, scoreRange.min.asString, WithScores, limit))
Expand Down Expand Up @@ -993,7 +996,7 @@ trait SortedSets[G[+_]] extends RedisEnvironment[G] {
val command = RedisCommand(
ZRevRank,
Tuple3(ArbitraryKeyInput[K](), ArbitraryValueInput[M](), WithScoreInput),
OptionalOutput(Tuple2Output(LongOutput, DoubleOutput).map { case (r, s) => RankScore(r, s) })
OptionalOutput(Tuple2Output(LongOutput, DoubleOrInfinity).map { case (r, s) => RankScore(r, s) })
)
command.run((key, member, WithScore))
}
Expand Down Expand Up @@ -1021,7 +1024,7 @@ trait SortedSets[G[+_]] extends RedisEnvironment[G] {
new ResultBuilder1[({ type lambda[x] = (Long, MemberScores[x]) })#lambda, G] {
def returning[M: Schema]: G[(Long, Chunk[MemberScore[M]])] = {
val memberScoresOutput =
ChunkTuple2Output(ArbitraryOutput[M](), DoubleOutput).map(_.map { case (m, s) => MemberScore(m, s) })
ChunkTuple2Output(ArbitraryOutput[M](), DoubleOrInfinity).map(_.map { case (m, s) => MemberScore(m, s) })

val command =
RedisCommand(
Expand All @@ -1042,13 +1045,14 @@ trait SortedSets[G[+_]] extends RedisEnvironment[G] {
* @param member
* Member of sorted set
* @return
* The score of member (a double precision floating point number.
* The score of member (a double precision floating point number).
* Double.PositiveInfinity and Double.NegativeInfinity are valid scores as well.
*/
final def zScore[K: Schema, M: Schema](key: K, member: M): G[Option[Double]] = {
val command = RedisCommand(
ZScore,
Tuple2(ArbitraryKeyInput[K](), ArbitraryValueInput[M]()),
OptionalOutput(DoubleOutput)
OptionalOutput(DoubleOrInfinity)
)
command.run((key, member))
}
Expand Down Expand Up @@ -1122,7 +1126,7 @@ trait SortedSets[G[+_]] extends RedisEnvironment[G] {
OptionalInput(AggregateInput),
WithScoresInput
),
ChunkTuple2Output(ArbitraryOutput[M](), DoubleOutput)
ChunkTuple2Output(ArbitraryOutput[M](), DoubleOrInfinity)
.map(_.map { case (m, s) => MemberScore(m, s) })
)
command.run((keys.size + 1, (key, keys.toList), weights, aggregate, WithScores))
Expand Down
Loading

0 comments on commit d62e461

Please sign in to comment.