-
Notifications
You must be signed in to change notification settings - Fork 63
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Implement PubSub api #714
Implement PubSub api #714
Changes from 51 commits
a280504
cd4a3f3
a01213d
9541519
ba89a59
9723182
65b524b
892ba20
43edb47
a6f74fa
c7decb7
b6438f0
5fb75e4
ce97f95
a25805b
fd0b303
7cb1491
2ab2db1
87eee71
f532e4b
e9e2a3c
014d2e4
49c141b
8b6ce73
f44c7fc
758ea9b
9febcf8
92aea5e
3fc77c0
3c8a307
0ea75a1
ef3b9f1
478577b
c4105ff
f432815
149ae4b
67652e6
8606409
6a3f68f
1a0862f
92457c4
bf52e26
c027295
a154683
592d327
386fefd
415f75e
1699211
be11d67
552e002
e218b08
ac85cb9
dadb5e7
88c3f4e
6e50d4d
5ca98e7
539ef3f
63cc263
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,35 @@ | ||
/* | ||
* Copyright 2021 John A. De Goes and the ZIO contributors | ||
* | ||
* Licensed under the Apache License, Version 2.0 (the "License"); | ||
* you may not use this file except in compliance with the License. | ||
* You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
|
||
package zio.redis | ||
|
||
import zio._ | ||
import zio.redis.internal.SubscriptionExecutor | ||
|
||
trait RedisSubscription extends api.Subscription | ||
mijicd marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
||
object RedisSubscription { | ||
lazy val local: ZLayer[CodecSupplier, RedisError.IOError, RedisSubscription] = | ||
SubscriptionExecutor.local >>> makeLayer | ||
|
||
lazy val singleNode: ZLayer[CodecSupplier & RedisConfig, RedisError.IOError, RedisSubscription] = | ||
SubscriptionExecutor.layer >>> makeLayer | ||
|
||
private def makeLayer: URLayer[CodecSupplier & SubscriptionExecutor, RedisSubscription] = | ||
ZLayer.fromFunction(Live.apply _) | ||
|
||
private final case class Live(codecSupplier: CodecSupplier, executor: SubscriptionExecutor) extends RedisSubscription | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,55 @@ | ||
/* | ||
* Copyright 2021 John A. De Goes and the ZIO contributors | ||
* | ||
* Licensed under the Apache License, Version 2.0 (the "License"); | ||
* you may not use this file except in compliance with the License. | ||
* You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
|
||
package zio.redis.api | ||
|
||
import zio.redis.Input._ | ||
import zio.redis.Output._ | ||
import zio.redis._ | ||
import zio.redis.internal.{RedisCommand, RedisEnvironment} | ||
import zio.schema.Schema | ||
import zio.{Chunk, IO} | ||
|
||
trait Publishing extends RedisEnvironment { | ||
import Publishing._ | ||
|
||
final def publish[A: Schema](channel: String, message: A): IO[RedisError, Long] = { | ||
val command = RedisCommand(Publish, Tuple2(StringInput, ArbitraryKeyInput[A]()), LongOutput, executor) | ||
command.run((channel, message)) | ||
} | ||
|
||
final def pubSubChannels(pattern: String): IO[RedisError, Chunk[String]] = { | ||
val command = RedisCommand(PubSubChannels, StringInput, ChunkOutput(MultiStringOutput), executor) | ||
command.run(pattern) | ||
} | ||
|
||
final def pubSubNumPat: IO[RedisError, Long] = { | ||
val command = RedisCommand(PubSubNumPat, NoInput, LongOutput, executor) | ||
command.run(()) | ||
} | ||
|
||
final def pubSubNumSub(channel: String, channels: String*): IO[RedisError, Chunk[NumberOfSubscribers]] = { | ||
val command = RedisCommand(PubSubNumSub, NonEmptyList(StringInput), NumSubResponseOutput, executor) | ||
command.run((channel, channels.toList)) | ||
} | ||
} | ||
|
||
private[redis] object Publishing { | ||
final val Publish = "PUBLISH" | ||
final val PubSubChannels = "PUBSUB CHANNELS" | ||
final val PubSubNumPat = "PUBSUB NUMPAT" | ||
final val PubSubNumSub = "PUBSUB NUMSUB" | ||
} |
Original file line number | Diff line number | Diff line change | ||||||||
---|---|---|---|---|---|---|---|---|---|---|
@@ -0,0 +1,123 @@ | ||||||||||
/* | ||||||||||
* Copyright 2021 John A. De Goes and the ZIO contributors | ||||||||||
* | ||||||||||
* Licensed under the Apache License, Version 2.0 (the "License"); | ||||||||||
* you may not use this file except in compliance with the License. | ||||||||||
* You may obtain a copy of the License at | ||||||||||
* | ||||||||||
* http://www.apache.org/licenses/LICENSE-2.0 | ||||||||||
* | ||||||||||
* Unless required by applicable law or agreed to in writing, software | ||||||||||
* distributed under the License is distributed on an "AS IS" BASIS, | ||||||||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||||||||||
* See the License for the specific language governing permissions and | ||||||||||
* limitations under the License. | ||||||||||
*/ | ||||||||||
|
||||||||||
package zio.redis.api | ||||||||||
|
||||||||||
import zio.redis.ResultBuilder.ResultStreamBuilder1 | ||||||||||
import zio.redis._ | ||||||||||
import zio.redis.api.Subscription.PubSubCallback | ||||||||||
import zio.redis.internal._ | ||||||||||
import zio.schema.Schema | ||||||||||
import zio.stream.Stream | ||||||||||
import zio.{Chunk, IO, UIO} | ||||||||||
|
||||||||||
trait Subscription extends SubscribeEnvironment { | ||||||||||
|
||||||||||
final def subscribe(channel: String): ResultStreamBuilder1[Id] = | ||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Add doc comments for the public API. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I added doc |
||||||||||
subscribeWithCallback(channel)(None, None) | ||||||||||
|
||||||||||
final def subscribeWithCallback(channel: String)( | ||||||||||
onSubscribe: Option[PubSubCallback], | ||||||||||
onUnsubscribe: Option[PubSubCallback] | ||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. What do you think about giving default values here and removing
Suggested change
The same suggestion applies to other methods with optional callback parameters. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. yeah it seems to make more ergonomic api. I changed to use |
||||||||||
): ResultStreamBuilder1[Id] = | ||||||||||
new ResultStreamBuilder1[Id] { | ||||||||||
def returning[R: Schema]: Stream[RedisError, R] = | ||||||||||
RedisSubscriptionCommand(executor) | ||||||||||
.subscribe( | ||||||||||
Chunk.single(channel), | ||||||||||
onSubscribe, | ||||||||||
onUnsubscribe | ||||||||||
) | ||||||||||
.map(_._2) | ||||||||||
} | ||||||||||
|
||||||||||
final def subscribe( | ||||||||||
channel: String, | ||||||||||
channels: String* | ||||||||||
): ResultStreamBuilder1[({ type lambda[x] = (String, x) })#lambda] = | ||||||||||
subscribeWithCallback(channel, channels: _*)(None, None) | ||||||||||
|
||||||||||
final def subscribeWithCallback(channel: String, channels: String*)( | ||||||||||
onSubscribe: Option[PubSubCallback], | ||||||||||
onUnsubscribe: Option[PubSubCallback] | ||||||||||
): ResultStreamBuilder1[({ type lambda[x] = (String, x) })#lambda] = | ||||||||||
new ResultStreamBuilder1[({ type lambda[x] = (String, x) })#lambda] { | ||||||||||
def returning[R: Schema]: Stream[RedisError, (String, R)] = | ||||||||||
RedisSubscriptionCommand(executor).subscribe( | ||||||||||
Chunk.fromIterable(channel +: channels), | ||||||||||
onSubscribe, | ||||||||||
onUnsubscribe | ||||||||||
) | ||||||||||
} | ||||||||||
|
||||||||||
final def pSubscribe(pattern: String): ResultStreamBuilder1[Id] = | ||||||||||
pSubscribeWithCallback(pattern)(None, None) | ||||||||||
|
||||||||||
final def pSubscribeWithCallback( | ||||||||||
pattern: String | ||||||||||
)( | ||||||||||
onSubscribe: Option[PubSubCallback], | ||||||||||
onUnsubscribe: Option[PubSubCallback] | ||||||||||
): ResultStreamBuilder1[Id] = | ||||||||||
new ResultStreamBuilder1[Id] { | ||||||||||
def returning[R: Schema]: Stream[RedisError, R] = | ||||||||||
RedisSubscriptionCommand(executor) | ||||||||||
.pSubscribe( | ||||||||||
Chunk.single(pattern), | ||||||||||
onSubscribe, | ||||||||||
onUnsubscribe | ||||||||||
) | ||||||||||
.map(_._2) | ||||||||||
} | ||||||||||
|
||||||||||
final def pSubscribe( | ||||||||||
pattern: String, | ||||||||||
patterns: String* | ||||||||||
Comment on lines
+119
to
+120
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Type-safety could be improved. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Are you suggesting that defines the parameter types for channels and patterns? |
||||||||||
): ResultStreamBuilder1[({ type lambda[x] = (String, x) })#lambda] = | ||||||||||
pSubscribeWithCallback(pattern, patterns: _*)(None, None) | ||||||||||
|
||||||||||
final def pSubscribeWithCallback( | ||||||||||
pattern: String, | ||||||||||
patterns: String* | ||||||||||
)( | ||||||||||
onSubscribe: Option[PubSubCallback], | ||||||||||
onUnsubscribe: Option[PubSubCallback] | ||||||||||
): ResultStreamBuilder1[({ type lambda[x] = (String, x) })#lambda] = | ||||||||||
new ResultStreamBuilder1[({ type lambda[x] = (String, x) })#lambda] { | ||||||||||
def returning[R: Schema]: Stream[RedisError, (String, R)] = | ||||||||||
RedisSubscriptionCommand(executor).pSubscribe( | ||||||||||
Chunk.fromIterable(pattern +: patterns), | ||||||||||
onSubscribe, | ||||||||||
onUnsubscribe | ||||||||||
) | ||||||||||
} | ||||||||||
|
||||||||||
final def unsubscribe(channels: String*): IO[RedisError, Unit] = | ||||||||||
RedisSubscriptionCommand(executor).unsubscribe(Chunk.fromIterable(channels)) | ||||||||||
|
||||||||||
final def pUnsubscribe(patterns: String*): IO[RedisError, Unit] = | ||||||||||
RedisSubscriptionCommand(executor).pUnsubscribe(Chunk.fromIterable(patterns)) | ||||||||||
|
||||||||||
} | ||||||||||
|
||||||||||
object Subscription { | ||||||||||
type PubSubCallback = (String, Long) => UIO[Unit] | ||||||||||
|
||||||||||
final val Subscribe = "SUBSCRIBE" | ||||||||||
final val Unsubscribe = "UNSUBSCRIBE" | ||||||||||
final val PSubscribe = "PSUBSCRIBE" | ||||||||||
final val PUnsubscribe = "PUNSUBSCRIBE" | ||||||||||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,38 @@ | ||
/* | ||
* Copyright 2021 John A. De Goes and the ZIO contributors | ||
* | ||
* Licensed under the Apache License, Version 2.0 (the "License"); | ||
* you may not use this file except in compliance with the License. | ||
* You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
|
||
package zio.redis.internal | ||
|
||
object PubSub { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Just mark the whole object as There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I marked it to them |
||
private[redis] sealed trait PushMessage { | ||
def key: SubscriptionKey | ||
} | ||
|
||
private[redis] object PushMessage { | ||
final case class Subscribed(key: SubscriptionKey, numOfSubs: Long) extends PushMessage | ||
final case class Unsubscribed(key: SubscriptionKey, numOfSubs: Long) extends PushMessage | ||
final case class Message(key: SubscriptionKey, destChannel: String, message: RespValue) extends PushMessage | ||
} | ||
|
||
private[redis] sealed trait SubscriptionKey { | ||
def value: String | ||
} | ||
|
||
private[redis] object SubscriptionKey { | ||
final case class Channel(value: String) extends SubscriptionKey | ||
final case class Pattern(value: String) extends SubscriptionKey | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I know that there is an order in the response, but maybe you should use a
Map
instead (for easier access to the value of the specific channel).