Skip to content

Commit

Permalink
feat: Implement general purpose processing pipelines
Browse files Browse the repository at this point in the history
  • Loading branch information
phinner committed Jun 21, 2023
1 parent 5b90be0 commit eac3b4b
Show file tree
Hide file tree
Showing 22 changed files with 886 additions and 392 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,14 @@ import kotlin.properties.ReadOnlyProperty
import kotlin.reflect.KProperty
import kotlin.reflect.full.companionObject

// TODO:
// Create an interface LoggerAware and implement it in all classes that use the logger
// open class LoggerAsPropertyDelegate {
// protected val logger by LoggerDelegate()
// //...
// }

// https://www.baeldung.com/kotlin/logging
class LoggerDelegate<in R : Any> : ReadOnlyProperty<R, Logger> {
override fun getValue(thisRef: R, property: KProperty<*>): Logger =
LoggerFactory.getLogger(getClassForLogging(thisRef.javaClass))
Expand Down
14 changes: 11 additions & 3 deletions foundation-mindustry-core/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ dependencies {
implementation(libs.jsoup)
mindustryDependencies()
compileOnly(libs.distributor.api)
compileOnly(libs.distributor.kotlin)
}

tasks.shadowJar {
Expand Down Expand Up @@ -82,19 +83,26 @@ val downloadKotlinRuntime = tasks.register<GithubArtifactDownload>("downloadKotl
version.set(libs.versions.kotlin.map { "v2.0.0-k.$it" })
}

val downloadDistributor = tasks.register<GithubArtifactDownload>("downloadDistributor") {
val downloadDistributorCore = tasks.register<GithubArtifactDownload>("downloadDistributor") {
user.set("Xpdustry")
repo.set("Distributor")
name.set("DistributorCore.jar")
version.set(libs.versions.distributor.map { "v$it" })
}

val downloadDistributorKotlin = tasks.register<GithubArtifactDownload>("downloadDistributorKotlin") {
user.set("Xpdustry")
repo.set("Distributor")
name.set("DistributorKotlin.jar")
version.set(libs.versions.distributor.map { "v$it" })
}

tasks.runMindustryClient {
mods.setFrom()
}

tasks.runMindustryServer {
mods.setFrom(downloadKotlinRuntime, tasks.shadowJar, downloadDistributor)
mods.setFrom(downloadKotlinRuntime, tasks.shadowJar, downloadDistributorCore, downloadDistributorKotlin)
}

// Second server for testing discovery
Expand All @@ -104,5 +112,5 @@ tasks.register<MindustryExec>("runMindustryServer2") {
mainClass.set("mindustry.server.ServerLauncher")
modsPath.set("./config/mods")
standardInput = System.`in`
mods.setFrom(downloadKotlinRuntime, tasks.shadowJar, downloadDistributor)
mods.setFrom(downloadKotlinRuntime, tasks.shadowJar, downloadDistributorCore, downloadDistributorKotlin)
}
1 change: 1 addition & 0 deletions foundation-mindustry-core/plugin.json
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
"repo": "Xpdustry/Foundation",
"dependencies": [
"distributor-core",
"distributor-kotlin",
"kotlin-runtime"
]
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,33 +19,28 @@ package com.xpdustry.foundation.mindustry.core

import com.google.inject.Provides
import com.google.inject.Singleton
import com.google.inject.name.Named
import com.xpdustry.foundation.common.annotation.FoundationDir
import com.xpdustry.foundation.common.application.FoundationMetadata
import com.xpdustry.foundation.common.application.FoundationPlatform
import com.xpdustry.foundation.common.application.KotlinAbstractModule
import com.xpdustry.foundation.common.config.FoundationConfig
import com.xpdustry.foundation.common.network.ServerInfo
import com.xpdustry.foundation.common.version.FoundationVersion
import com.xpdustry.foundation.mindustry.core.annotation.ClientSide
import com.xpdustry.foundation.mindustry.core.annotation.ServerSide
import com.xpdustry.foundation.mindustry.core.chat.ChatMessagePipeline
import com.xpdustry.foundation.mindustry.core.chat.SimpleChatMessagePipeline
import com.xpdustry.foundation.mindustry.core.command.FoundationPluginCommandManager
import com.xpdustry.foundation.mindustry.core.security.verif.SimpleVerificationService
import com.xpdustry.foundation.mindustry.core.security.verif.VerificationService
import com.xpdustry.foundation.mindustry.core.placeholder.PlaceholderPipeline
import com.xpdustry.foundation.mindustry.core.placeholder.SimplePlaceholderManager
import com.xpdustry.foundation.mindustry.core.verification.SimpleVerificationPipeline
import com.xpdustry.foundation.mindustry.core.verification.VerificationPipeline
import java.nio.file.Path

class FoundationMindustryModule(private val plugin: FoundationPlugin) : KotlinAbstractModule() {
override fun configure() {
bind(ServerInfo::class)
.provider(MindustryServerInfoProvider::class)
.singleton()

bind(FoundationPluginCommandManager::class)
.annotated(ClientSide::class)
.instance(plugin.clientCommandManager)

bind(FoundationPluginCommandManager::class)
.annotated(ServerSide::class)
.instance(plugin.serverCommandManager)
}

@Provides @Singleton @FoundationDir
Expand All @@ -58,6 +53,20 @@ class FoundationMindustryModule(private val plugin: FoundationPlugin) : KotlinAb
FoundationVersion.parse(plugin.descriptor.version),
)

@Provides
@Named("client")
fun provideClientCommandManager(): FoundationPluginCommandManager = plugin.clientCommandManager

@Provides
@Named("server")
fun provideServerCommandManager(): FoundationPluginCommandManager = plugin.serverCommandManager

@Provides @Singleton
fun provideVerificationPipeline(): VerificationPipeline = SimpleVerificationPipeline()

@Provides @Singleton
fun provideChatPipeline(): ChatMessagePipeline = SimpleChatMessagePipeline()

@Provides @Singleton
fun provideVerificationService(): VerificationService = SimpleVerificationService()
fun providePlaceholderService(): PlaceholderPipeline = SimplePlaceholderManager()
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,10 @@ import com.xpdustry.foundation.common.FoundationCommonModule
import com.xpdustry.foundation.common.application.FoundationListener
import com.xpdustry.foundation.common.application.SimpleFoundationApplication
import com.xpdustry.foundation.common.misc.ExitStatus
import com.xpdustry.foundation.mindustry.core.chat.ChatMessageService
import com.xpdustry.foundation.mindustry.core.command.FoundationPluginCommandManager
import com.xpdustry.foundation.mindustry.core.listener.ConventionListener
import com.xpdustry.foundation.mindustry.core.security.verif.DdosVerification
import com.xpdustry.foundation.mindustry.core.security.verif.PunishmentVerification
import com.xpdustry.foundation.mindustry.core.verification.VerificationService
import fr.xpdustry.distributor.api.DistributorProvider
import fr.xpdustry.distributor.api.plugin.AbstractMindustryPlugin
import fr.xpdustry.distributor.api.plugin.MindustryPlugin
Expand Down Expand Up @@ -66,8 +66,8 @@ class FoundationPlugin : AbstractMindustryPlugin() {
)

application.register(ConventionListener::class)
application.register(PunishmentVerification::class)
application.register(DdosVerification::class)
application.register(VerificationService::class)
application.register(ChatMessageService::class)

application.init()
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
/*
* Foundation, the software collection powering the Xpdustry network.
* Copyright (C) 2023 Xpdustry
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*/
package com.xpdustry.foundation.mindustry.core.chat

import com.xpdustry.foundation.common.misc.LoggerDelegate
import com.xpdustry.foundation.common.misc.toValueFlux
import com.xpdustry.foundation.mindustry.core.processing.AbstractProcessorPipeline
import com.xpdustry.foundation.mindustry.core.processing.ProcessorPipeline
import mindustry.gen.Player
import reactor.core.publisher.Mono
import reactor.core.scheduler.Schedulers

data class ChatMessageContext(
val player: Player,
val target: Player,
val message: String,
)

interface ChatMessagePipeline : ProcessorPipeline<ChatMessageContext, String>

class SimpleChatMessagePipeline : ChatMessagePipeline, AbstractProcessorPipeline<ChatMessageContext, String>() {

// TODO: The reduce function is blocking, we need to figure out how to make it async
override fun build(context: ChatMessageContext): Mono<String> =
processors.toValueFlux()
.publishOn(Schedulers.boundedElastic())
.reduce(context) { ctx, processor ->
if (ctx.message.isEmpty()) {
return@reduce ctx
}
return@reduce processor.process(ctx)
.onErrorResume { error ->
logger.error("Error while processing chat message for player ${ctx.player.name()}", error)
Mono.empty()
}
.switchIfEmpty(Mono.just(""))
.map { ctx.copy(message = it) }
.block()!!
}
.flatMap {
if (it.message.isEmpty()) Mono.empty() else Mono.just(it.message)
}

companion object {
private val logger by LoggerDelegate()
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,163 @@
/*
* Foundation, the software collection powering the Xpdustry network.
* Copyright (C) 2023 Xpdustry
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*/
package com.xpdustry.foundation.mindustry.core.chat

import arc.Events
import arc.util.CommandHandler.ResponseType
import arc.util.Log
import arc.util.Time
import cloud.commandframework.arguments.standard.StringArgument
import cloud.commandframework.kotlin.extension.buildAndRegister
import com.google.inject.Inject
import com.google.inject.name.Named
import com.xpdustry.foundation.common.application.FoundationListener
import com.xpdustry.foundation.common.translator.Translator
import com.xpdustry.foundation.mindustry.core.command.FoundationPluginCommandManager
import com.xpdustry.foundation.mindustry.core.misc.MindustryScheduler
import fr.xpdustry.distributor.api.command.argument.PlayerArgument
import fr.xpdustry.distributor.api.util.Priority
import mindustry.Vars
import mindustry.game.EventType.PlayerChatEvent
import mindustry.gen.Call
import mindustry.gen.Groups
import mindustry.gen.Player
import mindustry.gen.SendChatMessageCallPacket
import mindustry.net.Administration
import mindustry.net.Packets.KickReason
import mindustry.net.ValidateException
import reactor.core.publisher.Mono

class ChatMessageService @Inject constructor(
private val pipeline: ChatMessagePipeline,
private val translator: Translator,
@Named("client")
private val clientCommandManager: FoundationPluginCommandManager,
) : FoundationListener {
override fun onFoundationInit() {
pipeline.register("translator", Priority.LOW, TranslationProcessor(translator))
pipeline.register("fuck", Priority.HIGH) { Mono.just(it.message.replace(Regex("fuck"), "****")) }

// Intercept chat messages, so they go through the async processing pipeline
Vars.net.handleServer(SendChatMessageCallPacket::class.java) { con, packet ->
interceptChatMessage(con.player, packet.message, pipeline)
}

clientCommandManager.buildAndRegister("t") {
commandDescription("Send a message to your team.")
argument(StringArgument.greedy("message"))
handler {
val sender = it.sender.player
val normalized: String = Vars.netServer.admins.filterMessage(it.sender.player, it.get("message"))
?: return@handler

Groups.player.each { target ->
if (target.team() != it.sender.player.team()) return@each
pipeline.build(ChatMessageContext(it.sender.player, target, normalized))
.publishOn(MindustryScheduler)
.subscribe { result ->
target.sendMessage(
"[#${sender.team().color}]<T> ${Vars.netServer.chatFormatter.format(sender, result)}",
sender,
result,
)
}
}
}
}

clientCommandManager.buildAndRegister("w") {
commandDescription("Send a private message to a player.")
argument(PlayerArgument.of("player"))
argument(StringArgument.greedy("message"))
handler {
val sender = it.sender.player
val target = it.get<Player>("player")
val normalized: String = Vars.netServer.admins.filterMessage(it.sender.player, it.get("message"))
?: return@handler

pipeline.build(ChatMessageContext(it.sender.player, target, normalized))
.publishOn(MindustryScheduler)
.subscribe { result ->
target.sendMessage(
"[gray]<W>[] ${Vars.netServer.chatFormatter.format(sender, result)}",
sender,
result,
)
}
}
}
}
}

private fun interceptChatMessage(player: Player, message: String, pipeline: ChatMessagePipeline) {
// do not receive chat messages from clients that are too young or not registered
if (Time.timeSinceMillis(player.con.connectTime) < 500 || !player.con.hasConnected || !player.isAdded) return

// detect and kick for foul play
if (!player.con.chatRate.allow(2000, Administration.Config.chatSpamLimit.num())) {
player.con.kick(KickReason.kick)
Vars.netServer.admins.blacklistDos(player.con.address)
return
}

if (message.length > Vars.maxTextLength) {
throw ValidateException(player, "Player has sent a message above the text limit.")
}

var normalized: String? = message.replace("\n", "")

Events.fire(PlayerChatEvent(player, normalized))

// log commands before they are handled
if (normalized!!.startsWith(Vars.netServer.clientCommands.getPrefix())) {
// log with brackets
Log.info("<&fi@: @&fr>", "&lk" + player.plainName(), "&lw$normalized")
}

// check if it's a command
val response = Vars.netServer.clientCommands.handleMessage(normalized, player)

if (response.type == ResponseType.noCommand) { // no command to handle
normalized = Vars.netServer.admins.filterMessage(player, normalized)
// suppress chat message if it's filtered out
if (normalized == null) {
return
}

// BEGIN FOUNDATION
Groups.player.each { target ->
pipeline.build(ChatMessageContext(player, target, normalized)).publishOn(MindustryScheduler).subscribe { result ->
if (target == player) {
// server console logging
Log.info("&fi@: @", "&lc" + player.plainName(), "&lw$result")
}
// invoke event for all clients but also locally
// this is required so other clients get the correct name even if they don't know who's sending it yet
Call.sendMessage(Vars.netServer.chatFormatter.format(player, result), result, player)
}
}
} else {
// a command was sent, now get the output
if (response.type != ResponseType.valid) {
val text = Vars.netServer.invalidHandler.handle(player, response)
if (text != null) {
player.sendMessage(text)
}
}
}
}
Loading

0 comments on commit eac3b4b

Please sign in to comment.