diff --git a/redis/src/main/scala/zio/redis/ClusterExecutor.scala b/redis/src/main/scala/zio/redis/ClusterExecutor.scala index 002013233..2b530c357 100644 --- a/redis/src/main/scala/zio/redis/ClusterExecutor.scala +++ b/redis/src/main/scala/zio/redis/ClusterExecutor.scala @@ -18,22 +18,18 @@ package zio.redis import zio._ import zio.redis.ClusterExecutor._ -import zio.redis.Input.StringInput import zio.redis.api.Cluster.AskingCommand import zio.redis.codecs.StringUtf8Codec import zio.redis.options.Cluster._ import zio.schema.codec.BinaryCodec -import zio.stream.ZStream import java.io.IOException final case class ClusterExecutor( clusterConnectionRef: Ref.Synchronized[ClusterConnection], config: RedisClusterConfig, - scope: Scope.Closeable, - codec: BinaryCodec -) extends RedisExecutor - with RedisPubSub { + scope: Scope.Closeable +) extends RedisExecutor { def execute(command: Chunk[RespValue.BulkString]): IO[RedisError, RespValue] = { @@ -61,37 +57,30 @@ final case class ClusterExecutor( recover.retry(retryPolicy) } - for { - keySlot <- extractKeySlot(command) - result <- executeSafe(keySlot).provide(codecLayer) - } yield result - } - - private def extractKeySlot(command: Chunk[RespValue.BulkString]) = for { key <- ZIO.attempt(command(1)).orElseFail(CusterKeyError) keySlot = Slot((key.asCRC16 % SlotsAmount).toLong) - } yield keySlot + result <- executeSafe(keySlot) + } yield result + } private def executor(slot: Slot): IO[RedisError.IOError, RedisExecutor] = clusterConnectionRef.get.map(_.executor(slot)).flatMap(ZIO.fromOption(_).orElseFail(CusterKeyExecutorError)) // TODO introduce max connection amount - private def executor(address: RedisUri) = + private def executor(address: RedisUri): IO[RedisError.IOError, RedisExecutor] = clusterConnectionRef.modifyZIO { cc => val executorOpt = cc.executors.get(address).map(es => (es.executor, cc)) val enrichedClusterIO = - scope - .extend[BinaryCodec](connectToNode(address)) - .map(es => (es.executor, cc.addExecutor(address, es))) + scope.extend[Any](connectToNode(address)).map(es => (es.executor, cc.addExecutor(address, es))) ZIO.fromOption(executorOpt).catchAll(_ => enrichedClusterIO) } - private def refreshConnect = + private def refreshConnect: IO[RedisError, Unit] = clusterConnectionRef.updateZIO { connection => val addresses = connection.partitions.flatMap(_.addresses) for { - cluster <- scope.extend[BinaryCodec](initConnectToCluster(addresses)) + cluster <- scope.extend[Any](initConnectToCluster(addresses)) _ <- ZIO.foreachParDiscard(connection.executors) { case (_, es) => es.scope.close(Exit.unit) } } yield cluster } @@ -103,66 +92,33 @@ final case class ClusterExecutor( case _: RedisError.IOError | _: RedisError.ClusterRedisError => true case _ => false } - - private val codecLayer = ZLayer.succeed(codec) - - def execute(command: RedisPubSubCommand): ZStream[BinaryCodec, RedisError, PushProtocol] = { - def pubSubExecutor(key: String) = - ZIO.serviceWithZIO[BinaryCodec] { implicit codec => - extractKeySlot(StringInput.encode(key)).flatMap(slot => - clusterConnectionRef.get - .map(_.pubSub(slot)) - .flatMap(ZIO.fromOption(_).orElseFail(CusterKeyExecutorError)) - ) - } - - def unsubscriptionStreams(keys: List[String]) = { - lazy val allPubSubExecutors = clusterConnectionRef.get.map(_.executors.values.map(_.pubSub).toList) - (keys.headOption match { - case Some(channel) => pubSubExecutor(channel).map(List(_)) - case None => allPubSubExecutors - }).map(_.map(_.execute(command))) - .map(_.fold(ZStream.empty)(_ merge _)) - } - - ZStream.unwrap[BinaryCodec, RedisError, PushProtocol]( - command match { - case RedisPubSubCommand.Subscribe(channel, _) => pubSubExecutor(channel).map(_.execute(command)) - case RedisPubSubCommand.PSubscribe(pattern, _) => pubSubExecutor(pattern).map(_.execute(command)) - case RedisPubSubCommand.Unsubscribe(channels) => unsubscriptionStreams(channels) - case RedisPubSubCommand.PUnsubscribe(patterns) => unsubscriptionStreams(patterns) - } - ) - } } object ClusterExecutor { - lazy val layer: ZLayer[RedisClusterConfig with BinaryCodec, RedisError, RedisExecutor with RedisPubSub] = + lazy val layer: ZLayer[RedisClusterConfig, RedisError, RedisExecutor] = ZLayer.scoped { for { config <- ZIO.service[RedisClusterConfig] layerScope <- ZIO.scope - codec <- ZIO.service[BinaryCodec] clusterScope <- Scope.make - executor <- clusterScope.extend[BinaryCodec](create(config, clusterScope, codec)) + executor <- clusterScope.extend[Any](create(config, clusterScope)) _ <- layerScope.addFinalizerExit(e => clusterScope.close(e)) } yield executor } private[redis] def create( config: RedisClusterConfig, - scope: Scope.Closeable, - codec: BinaryCodec - ) = + scope: Scope.Closeable + ): ZIO[Scope, RedisError, ClusterExecutor] = for { clusterConnection <- initConnectToCluster(config.addresses) clusterConnectionRef <- Ref.Synchronized.make(clusterConnection) - clusterExec = ClusterExecutor(clusterConnectionRef, config, scope, codec) + clusterExec = ClusterExecutor(clusterConnectionRef, config, scope) _ <- logScopeFinalizer("Cluster executor is closed") } yield clusterExec - private def initConnectToCluster(addresses: Chunk[RedisUri]) = + private def initConnectToCluster(addresses: Chunk[RedisUri]): ZIO[Scope, RedisError, ClusterConnection] = ZIO .collectFirst(addresses) { address => connectToCluster(address).foldZIO( @@ -187,20 +143,16 @@ object ClusterExecutor { private def connectToNode(address: RedisUri) = for { closableScope <- Scope.make - reqRepConn <- closableScope.extend[Any](RedisConnectionLive.create(RedisConfig(address.host, address.port))) - pubSubConn <- closableScope.extend[Any](RedisConnectionLive.create(RedisConfig(address.host, address.port))) - executor <- closableScope.extend[Any](SingleNodeExecutor.create(reqRepConn)) - pubSub <- closableScope.extend[BinaryCodec](SingleNodeRedisPubSub.create(pubSubConn)) + connection <- closableScope.extend[Any](RedisConnectionLive.create(RedisConfig(address.host, address.port))) + executor <- closableScope.extend[Any](SingleNodeExecutor.create(connection)) layerScope <- ZIO.scope _ <- layerScope.addFinalizerExit(closableScope.close(_)) - } yield ExecutorScope(executor, pubSub, closableScope) + } yield ExecutorScope(executor, closableScope) private def redis(address: RedisUri) = { - val redisConfigLayer = ZLayer.succeed(RedisConfig(address.host, address.port)) - val codecLayer = ZLayer.succeed[BinaryCodec](StringUtf8Codec) - val executorLayer = redisConfigLayer >>> RedisExecutor.layer - val pubSubLayer = redisConfigLayer ++ codecLayer >>> RedisPubSub.layer - val redisLayer = executorLayer ++ pubSubLayer >>> RedisLive.layer + val executorLayer = ZLayer.succeed(RedisConfig(address.host, address.port)) >>> RedisExecutor.layer + val codecLayer = ZLayer.succeed[BinaryCodec](StringUtf8Codec) + val redisLayer = executorLayer ++ codecLayer >>> RedisLive.layer for { closableScope <- Scope.make layer <- closableScope.extend[Any](redisLayer.memoize) diff --git a/redis/src/main/scala/zio/redis/SingleNodeExecutor.scala b/redis/src/main/scala/zio/redis/SingleNodeExecutor.scala index 1708395b6..a1ccdcf97 100644 --- a/redis/src/main/scala/zio/redis/SingleNodeExecutor.scala +++ b/redis/src/main/scala/zio/redis/SingleNodeExecutor.scala @@ -71,29 +71,32 @@ final class SingleNodeExecutor( .via(RespValue.decoder) .collectSome .foreach(response => resQueue.take.flatMap(_.succeed(response))) + } object SingleNodeExecutor { - final case class Request(command: Chunk[RespValue.BulkString], promise: Promise[RedisError, RespValue]) - - private final val True: Any => Boolean = _ => true - - private final val RequestQueueSize = 16 lazy val layer: ZLayer[RedisConnection, RedisError.IOError, RedisExecutor] = ZLayer.scoped { for { - conn <- ZIO.service[RedisConnection] - executor <- create(conn) + connection <- ZIO.service[RedisConnection] + executor <- create(connection) } yield executor } - private[redis] def create(conn: RedisConnection): URIO[Scope, SingleNodeExecutor] = + final case class Request(command: Chunk[RespValue.BulkString], promise: Promise[RedisError, RespValue]) + + private final val True: Any => Boolean = _ => true + + private final val RequestQueueSize = 16 + + private[redis] def create(connection: RedisConnection): URIO[Scope, SingleNodeExecutor] = for { reqQueue <- Queue.bounded[Request](RequestQueueSize) resQueue <- Queue.unbounded[Promise[RedisError, RespValue]] - executor = new SingleNodeExecutor(reqQueue, resQueue, conn) + executor = new SingleNodeExecutor(reqQueue, resQueue, connection) _ <- executor.run.forkScoped _ <- logScopeFinalizer(s"$executor Node Executor is closed") } yield executor + }