Skip to content

Commit

Permalink
Revert unrelated changes
Browse files Browse the repository at this point in the history
  • Loading branch information
0pg committed Jan 31, 2023
1 parent 4034499 commit d4aa330
Show file tree
Hide file tree
Showing 2 changed files with 33 additions and 78 deletions.
90 changes: 21 additions & 69 deletions redis/src/main/scala/zio/redis/ClusterExecutor.scala
Original file line number Diff line number Diff line change
Expand Up @@ -18,22 +18,18 @@ package zio.redis

import zio._
import zio.redis.ClusterExecutor._
import zio.redis.Input.StringInput
import zio.redis.api.Cluster.AskingCommand
import zio.redis.codecs.StringUtf8Codec
import zio.redis.options.Cluster._
import zio.schema.codec.BinaryCodec
import zio.stream.ZStream

import java.io.IOException

final case class ClusterExecutor(
clusterConnectionRef: Ref.Synchronized[ClusterConnection],
config: RedisClusterConfig,
scope: Scope.Closeable,
codec: BinaryCodec
) extends RedisExecutor
with RedisPubSub {
scope: Scope.Closeable
) extends RedisExecutor {

def execute(command: Chunk[RespValue.BulkString]): IO[RedisError, RespValue] = {

Expand Down Expand Up @@ -61,37 +57,30 @@ final case class ClusterExecutor(
recover.retry(retryPolicy)
}

for {
keySlot <- extractKeySlot(command)
result <- executeSafe(keySlot).provide(codecLayer)
} yield result
}

private def extractKeySlot(command: Chunk[RespValue.BulkString]) =
for {
key <- ZIO.attempt(command(1)).orElseFail(CusterKeyError)
keySlot = Slot((key.asCRC16 % SlotsAmount).toLong)
} yield keySlot
result <- executeSafe(keySlot)
} yield result
}

private def executor(slot: Slot): IO[RedisError.IOError, RedisExecutor] =
clusterConnectionRef.get.map(_.executor(slot)).flatMap(ZIO.fromOption(_).orElseFail(CusterKeyExecutorError))

// TODO introduce max connection amount
private def executor(address: RedisUri) =
private def executor(address: RedisUri): IO[RedisError.IOError, RedisExecutor] =
clusterConnectionRef.modifyZIO { cc =>
val executorOpt = cc.executors.get(address).map(es => (es.executor, cc))
val enrichedClusterIO =
scope
.extend[BinaryCodec](connectToNode(address))
.map(es => (es.executor, cc.addExecutor(address, es)))
scope.extend[Any](connectToNode(address)).map(es => (es.executor, cc.addExecutor(address, es)))
ZIO.fromOption(executorOpt).catchAll(_ => enrichedClusterIO)
}

private def refreshConnect =
private def refreshConnect: IO[RedisError, Unit] =
clusterConnectionRef.updateZIO { connection =>
val addresses = connection.partitions.flatMap(_.addresses)
for {
cluster <- scope.extend[BinaryCodec](initConnectToCluster(addresses))
cluster <- scope.extend[Any](initConnectToCluster(addresses))
_ <- ZIO.foreachParDiscard(connection.executors) { case (_, es) => es.scope.close(Exit.unit) }
} yield cluster
}
Expand All @@ -103,66 +92,33 @@ final case class ClusterExecutor(
case _: RedisError.IOError | _: RedisError.ClusterRedisError => true
case _ => false
}

private val codecLayer = ZLayer.succeed(codec)

def execute(command: RedisPubSubCommand): ZStream[BinaryCodec, RedisError, PushProtocol] = {
def pubSubExecutor(key: String) =
ZIO.serviceWithZIO[BinaryCodec] { implicit codec =>
extractKeySlot(StringInput.encode(key)).flatMap(slot =>
clusterConnectionRef.get
.map(_.pubSub(slot))
.flatMap(ZIO.fromOption(_).orElseFail(CusterKeyExecutorError))
)
}

def unsubscriptionStreams(keys: List[String]) = {
lazy val allPubSubExecutors = clusterConnectionRef.get.map(_.executors.values.map(_.pubSub).toList)
(keys.headOption match {
case Some(channel) => pubSubExecutor(channel).map(List(_))
case None => allPubSubExecutors
}).map(_.map(_.execute(command)))
.map(_.fold(ZStream.empty)(_ merge _))
}

ZStream.unwrap[BinaryCodec, RedisError, PushProtocol](
command match {
case RedisPubSubCommand.Subscribe(channel, _) => pubSubExecutor(channel).map(_.execute(command))
case RedisPubSubCommand.PSubscribe(pattern, _) => pubSubExecutor(pattern).map(_.execute(command))
case RedisPubSubCommand.Unsubscribe(channels) => unsubscriptionStreams(channels)
case RedisPubSubCommand.PUnsubscribe(patterns) => unsubscriptionStreams(patterns)
}
)
}
}

object ClusterExecutor {

lazy val layer: ZLayer[RedisClusterConfig with BinaryCodec, RedisError, RedisExecutor with RedisPubSub] =
lazy val layer: ZLayer[RedisClusterConfig, RedisError, RedisExecutor] =
ZLayer.scoped {
for {
config <- ZIO.service[RedisClusterConfig]
layerScope <- ZIO.scope
codec <- ZIO.service[BinaryCodec]
clusterScope <- Scope.make
executor <- clusterScope.extend[BinaryCodec](create(config, clusterScope, codec))
executor <- clusterScope.extend[Any](create(config, clusterScope))
_ <- layerScope.addFinalizerExit(e => clusterScope.close(e))
} yield executor
}

private[redis] def create(
config: RedisClusterConfig,
scope: Scope.Closeable,
codec: BinaryCodec
) =
scope: Scope.Closeable
): ZIO[Scope, RedisError, ClusterExecutor] =
for {
clusterConnection <- initConnectToCluster(config.addresses)
clusterConnectionRef <- Ref.Synchronized.make(clusterConnection)
clusterExec = ClusterExecutor(clusterConnectionRef, config, scope, codec)
clusterExec = ClusterExecutor(clusterConnectionRef, config, scope)
_ <- logScopeFinalizer("Cluster executor is closed")
} yield clusterExec

private def initConnectToCluster(addresses: Chunk[RedisUri]) =
private def initConnectToCluster(addresses: Chunk[RedisUri]): ZIO[Scope, RedisError, ClusterConnection] =
ZIO
.collectFirst(addresses) { address =>
connectToCluster(address).foldZIO(
Expand All @@ -187,20 +143,16 @@ object ClusterExecutor {
private def connectToNode(address: RedisUri) =
for {
closableScope <- Scope.make
reqRepConn <- closableScope.extend[Any](RedisConnectionLive.create(RedisConfig(address.host, address.port)))
pubSubConn <- closableScope.extend[Any](RedisConnectionLive.create(RedisConfig(address.host, address.port)))
executor <- closableScope.extend[Any](SingleNodeExecutor.create(reqRepConn))
pubSub <- closableScope.extend[BinaryCodec](SingleNodeRedisPubSub.create(pubSubConn))
connection <- closableScope.extend[Any](RedisConnectionLive.create(RedisConfig(address.host, address.port)))
executor <- closableScope.extend[Any](SingleNodeExecutor.create(connection))
layerScope <- ZIO.scope
_ <- layerScope.addFinalizerExit(closableScope.close(_))
} yield ExecutorScope(executor, pubSub, closableScope)
} yield ExecutorScope(executor, closableScope)

private def redis(address: RedisUri) = {
val redisConfigLayer = ZLayer.succeed(RedisConfig(address.host, address.port))
val codecLayer = ZLayer.succeed[BinaryCodec](StringUtf8Codec)
val executorLayer = redisConfigLayer >>> RedisExecutor.layer
val pubSubLayer = redisConfigLayer ++ codecLayer >>> RedisPubSub.layer
val redisLayer = executorLayer ++ pubSubLayer >>> RedisLive.layer
val executorLayer = ZLayer.succeed(RedisConfig(address.host, address.port)) >>> RedisExecutor.layer
val codecLayer = ZLayer.succeed[BinaryCodec](StringUtf8Codec)
val redisLayer = executorLayer ++ codecLayer >>> RedisLive.layer
for {
closableScope <- Scope.make
layer <- closableScope.extend[Any](redisLayer.memoize)
Expand Down
21 changes: 12 additions & 9 deletions redis/src/main/scala/zio/redis/SingleNodeExecutor.scala
Original file line number Diff line number Diff line change
Expand Up @@ -71,29 +71,32 @@ final class SingleNodeExecutor(
.via(RespValue.decoder)
.collectSome
.foreach(response => resQueue.take.flatMap(_.succeed(response)))

}

object SingleNodeExecutor {
final case class Request(command: Chunk[RespValue.BulkString], promise: Promise[RedisError, RespValue])

private final val True: Any => Boolean = _ => true

private final val RequestQueueSize = 16

lazy val layer: ZLayer[RedisConnection, RedisError.IOError, RedisExecutor] =
ZLayer.scoped {
for {
conn <- ZIO.service[RedisConnection]
executor <- create(conn)
connection <- ZIO.service[RedisConnection]
executor <- create(connection)
} yield executor
}

private[redis] def create(conn: RedisConnection): URIO[Scope, SingleNodeExecutor] =
final case class Request(command: Chunk[RespValue.BulkString], promise: Promise[RedisError, RespValue])

private final val True: Any => Boolean = _ => true

private final val RequestQueueSize = 16

private[redis] def create(connection: RedisConnection): URIO[Scope, SingleNodeExecutor] =
for {
reqQueue <- Queue.bounded[Request](RequestQueueSize)
resQueue <- Queue.unbounded[Promise[RedisError, RespValue]]
executor = new SingleNodeExecutor(reqQueue, resQueue, conn)
executor = new SingleNodeExecutor(reqQueue, resQueue, connection)
_ <- executor.run.forkScoped
_ <- logScopeFinalizer(s"$executor Node Executor is closed")
} yield executor

}

0 comments on commit d4aa330

Please sign in to comment.