Skip to content

Commit

Permalink
More improvements
Browse files Browse the repository at this point in the history
  • Loading branch information
leonardocustodio committed Sep 6, 2024
1 parent f72d71f commit ea3d73f
Show file tree
Hide file tree
Showing 4 changed files with 203 additions and 49 deletions.
9 changes: 8 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
@@ -1 +1,8 @@
/target
debug/
target/
Cargo.lock
**/*.rs.bk
*.pdb
.idea
.vscode
.cargo
110 changes: 109 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
@@ -1 +1,109 @@
# wallet-daemon-new
# Enjin Wallet Daemon

A daemon for automatically signing transactions submitted to the Enjin Platform.

[![License: LGPL 3.0](https://img.shields.io/badge/license-LGPL_3.0-purple)](https://opensource.org/license/lgpl-3-0/)

## Functionality

The wallet will poll every 6 seconds for new transactions.

Each transaction will be signed and submitted to a node. If the transaction fails the wallet retries using exponential backoff a couple of times until it's dropped.

The wallet keeps track of the nonce locally so more than 1 transaction can be submitted by block.

## Storage for keys

Only local storage for keys is supported, the BIP-39 encoding of the key is supported and password is required.

The path to the key storage is defined in the configuration file as `master_key`. If the storage doesn't exist it's created otherwise it's read.

The password is shared through the `KEY_PASS` env variable.

Should the password be wrong the wallet will panic.

## Configuration

* By default it will use any `config.*` file supported by the crate [`config`](https://crates.io/crates/config) the file can be overriden with the env variable `CONFIG_FILE`.
* The config file should have the following fields:
* `node`: URI for the node to connect.
* `api`: The URI for the platform API.
* `master_key`: path to where the key is or will be stored.
* The key that will sign the transactions will be created and stored the first time the program is run.
* If the key already exists it will be loaded.
* The API Key for the platform should be stored in the `PLATFORM_KEY` env var.

## Compile the wallet

Simply run:

`cargo build --release`

If you are going to run it without a relay chain, for testing purposes, use this instead (normally used for local integration tests):

`cargo build --features local`

Note that right now the platform only correctly encodes transactions for the matrix chain not for the relay chain so it won't be used.

## Architecture

* The `lib` directory is where most of the interesting code lives.
* In `bin` you will only find the startup code and is where the binary is created.

Externally the main things in `lib` are the "jobs" (in the aptly named module `jobs`) which are tasks run in the background:
* `PollJob` which continually polls the platform and sends the responses to the other job.
* `SignProccesor` which signs the received transaction requests it's sent and gives the txHash back to the platform.

You will also need to load the configuration to pass to the jobs with the `load_config` in `config_loader` module.

From inside `lib`, other than the `job` and `config_loader`, the important modules are:
* `wallet_trait` which contains multiple traits for defining a wallet (which in theory could work with any chain) and an implementation specific for the enjin blockchain.
* `chain_connection` traits related to a long-lived connection to a blockchain to submit transactions (which, again, in theory could work with any blockchain) and an implementation specific for any substrate-based chain.
* `connection_handler` contains a few wrappers to make it easier to work with long-lived connections, things like handling reconnection and re-submissions.

The `bin` uses the jobs and the configuration loading to execute the following:
1. Load the configuration, including the key, which it creates if it doesn't exist.
1. Start off the jobs which then:
1. Continuously polls the platform for new transactions, the polled transactions are mutated to `PROCCESING`.
1. Sign a transaction request when it is received.
1. Send the signed transaction to the blockchain.
1. The received transaction hash is submitted to the platform changing the state of the transaction to `BROADCAST`.
1. When `ctrl-c` is pressed stop the program.

## Deployment

For deployment there is a docker image in the `docker` directory that's built using the `build.sh` script.

The docker image is tagged `enjin/wallet-daemon`.

The image start command is the script in `data/start.sh`.

It requires a volume mounted in `/opt/app/config` with the `config.json` file (with the details corresponding to the deployment).

Furthemore, the `config.json` requires the `master_key` entry to be `/opt/app/storage/` (For an example look at the `config.json` in the `docker` directory).

Also it expects the following secrets to be passed in the order that they are listed here:
You will need to generate a key locally by running the daemon with `cargo run --release` (with the `KEY_PASS` env variable set to something safe), to see an example of the generated key take a look at the `store` directory and use that key to set these secrets.
* `STORE_NAME`: the storage name, this is a hex number which is the file name where the key is stored, it's generated when the key is generate (In the current example: `73723235301cb3057d43941d5f631613aa1661be0354d39e34f23d4ef527396b10d2bb7a`).
* `SEED_PHRASE`: These are the content of the key file, which are the bip-39 words used to generate the key. (In the current example: "duty captain man fantasy angry window release hammer suspect bullet panda special").
* `KEY_PASS`: The pass of the key which when originally generated is set through the `KEY_PASS` env variable. (In the current example: `example`).
* `PLATFORM_KEY`: The platform key is the API token used to authenticate the wallet daemon so it can request new transactions from the platform to sign.

### Example:
```
docker run -v localDirectoryWithConfigFile:/opt/app/config \
-e CONFIG_FILE="/opt/app/config/config.json" \
efinity/wallet \
"73723235301cb3057d43941d5f631613aa1661be0354d39e34f23d4ef527396b10d2bb7a" \
"duty captain man fantasy angry window release hammer suspect bullet panda special" \
example \
bFwNECHZlzsQsrSudzWymXhalcViuPwKXFfnPYsm
```

Another important thing to note when the seed phrase is written to the file, there is something weird that can happen with `printf` so if there is an error reading the key when deploying try defining the env variable `ADD_QUOTES` (or undefining it if it exists), if you want to know more about this read the comment in the `start.sh` script.

**REMEMBER NOT TO USE ANY OF THE KEYS IN THIS REPOSITORY FOR AN ACTUAL DEPLOYMENT**

## License

The LGPL 3.0 License. Please see [License File](LICENSE) for more information.
5 changes: 4 additions & 1 deletion src/platform_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,10 @@ pub async fn update_transaction(
transaction.state,
);
}
Err(e) => tracing::error!("Error decoding response of the platform: {:?}", e),
Err(e) => {
tracing::error!("Error decoding response of the platform: {:?}", e);
tracing::error!("Response body: {:?}", res);
}
},
Err(e) => tracing::error!("Error sending UpdateTransaction: {:?}", e),
}
Expand Down
128 changes: 82 additions & 46 deletions src/transaction.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,14 @@ use std::num::NonZeroUsize;
use std::sync::{Arc, Mutex};
use std::thread;
use std::time::Duration;
use backoff::exponential::ExponentialBackoff;
use backoff::SystemClock;
use subxt::backend::rpc::RpcClient;
use subxt::config::substrate::{BlakeTwo256, SubstrateHeader};
use subxt::config::DefaultExtrinsicParamsBuilder as Params;
use subxt::tx::Signer;
use subxt::{tx::TxStatus, OnlineClient, PolkadotConfig};
use subxt::ext::codec::Encode;
use subxt_signer::sr25519::Keypair;
use subxt_signer::DeriveJunction;
use tokio::sync::mpsc::{Receiver, Sender};
Expand Down Expand Up @@ -53,6 +56,7 @@ impl TryFrom<mark_and_list_pending_transactions::MarkAndListPendingTransactionsM
type Error = Box<dyn std::error::Error + Send + Sync>;

fn try_from(edge: mark_and_list_pending_transactions::MarkAndListPendingTransactionsMarkAndListPendingTransactionsEdges) -> Result<Self, Self::Error> {
tracing::info!("{:?}", edge);
let external_id = edge.node.wallet.as_ref().and_then(|w| w.external_id.clone());

Ok(Self {
Expand Down Expand Up @@ -205,14 +209,14 @@ impl TransactionJob {
.collect())
}
}

#[derive(AsyncIncremental, PartialEq, Eq, Debug)]
struct Nonce(u64);

struct EnjinWallet {
nonce: Nonce,
players_nonce: Mutex<LruCache<DeriveJunction, Nonce>>,
}
//
// #[derive(AsyncIncremental, PartialEq, Eq, Debug)]
// struct Nonce(u64);
//
// struct EnjinWallet {
// nonce: Nonce,
// players_nonce: Mutex<LruCache<DeriveJunction, Nonce>>,
// }

pub struct TransactionProcessor {
chain_client: Arc<OnlineClient<PolkadotConfig>>,
Expand Down Expand Up @@ -251,36 +255,52 @@ impl TransactionProcessor {
platform_token: String,
chain_client: Arc<OnlineClient<PolkadotConfig>>,
keypair: Keypair,
nonce: u64,
nonce: Arc<Mutex<u64>>,
request_id: i64,
payload: Vec<u8>,
block_header: SubstrateHeader<u32, BlakeTwo256>,
) -> Result<String, Box<dyn std::error::Error + Send + Sync>> {
let params = Params::new().nonce(nonce).mortal(&block_header, 32).build();
// Let's correct the nonce here
let chain_nonce = chain_client.tx().account_nonce(&keypair.public_key().into()).await.unwrap();
let correct_nonce;
{
let mut latest_nonce = nonce.lock().unwrap();
correct_nonce = latest_nonce.max(chain_nonce);

tracing::warn!("Using nonce: {correct_nonce:?} - Cached nonce: {latest_nonce:?} - Metadata nonce: {chain_nonce:?} - Next nonce: {:?}", correct_nonce + 1);

*latest_nonce = correct_nonce + 1;
}



let params = Params::new().nonce(correct_nonce).mortal(&block_header, 16).build();

// We probably need to try to create the tx (to check if it is valid before grabbing a nonce for it
let mut transaction = chain_client
.tx()
.create_signed(&Wrapper(payload), &keypair, params)
.await?
.submit_and_watch()
.await?;

// TODO: There is a bug in the platform where we can pass the hash twice
// let hash = format!("0x{}", hex::encode(transaction.extrinsic_hash().0));
// let encoded_tx = hex::encode(&transaction.);
// let mut sub = tx.submit_and_watch().await?;

while let Some(status) = transaction.next().await {
match status? {
TxStatus::Validated => {
let trimmed = trim_account(hex::encode(keypair.public_key().0));
tracing::info!(
"Sent transaction #{} with nonce {} signed by {}",
request_id,
nonce,
trimmed
);
"Sent transaction #{} with nonce {} signed by {}",
request_id,
correct_nonce,
trimmed
);
}
TxStatus::Invalid { message } => {
tracing::error!("Transaction #{} is INVALID: {:?}", request_id, message);
// tracing::error!("Full transaction: {}", encoded_tx);
}
TxStatus::Broadcasted { num_peers: _ } => {
tracing::info!("Transaction #{} has been BROADCASTED", request_id);
Expand All @@ -296,38 +316,40 @@ impl TransactionProcessor {
signed_at: Some(block_header.number as i64),
},
)
.await;
.await;
}
TxStatus::InBestBlock(block) => {
tracing::info!(
"Transaction #{} is now InBestBlock: {:?}",
request_id,
block.block_hash()
);
"Transaction #{} is now InBestBlock: {:?}",
request_id,
block.block_hash()
);
return Ok(hex::encode(block.extrinsic_hash().0));
}
TxStatus::NoLongerInBestBlock => {
tracing::error!("Transaction #{} no longer InBestBlock", request_id)
}
TxStatus::Dropped { message } => {
tracing::error!(
"Transaction #{} has been DROPPED: {:?}",
request_id,
message
)
"Transaction #{} has been DROPPED: {:?}",
request_id,
message
)
}
TxStatus::InFinalizedBlock(in_block) => tracing::info!(
"Transaction #{} with hash {:?} was included at block: {:?}",
request_id,
in_block.extrinsic_hash(),
in_block.block_hash()
),
"Transaction #{} with hash {:?} was included at block: {:?}",
request_id,
in_block.extrinsic_hash(),
in_block.block_hash()
),
TxStatus::Error { message } => {
tracing::error!("Transaction #{} has an ERROR: {:?}", request_id, message)
}
}
}

// TODO: There is a bug in the platform where we can pass the hash twice
// let hash = format!("0x{}", hex::encode(transaction.extrinsic_hash().0));
Err(format!("Transaction #{} could not be signed or sent", request_id).into())
}

Expand All @@ -336,7 +358,7 @@ impl TransactionProcessor {
platform_client: Client,
block_subscription: Arc<BlockSubscription>,
keypair: Keypair,
nonce: Arc<AsyncIncrement<Nonce>>,
nonce: Arc<Mutex<u64>>,
platform_url: String,
platform_token: String,
TransactionRequest {
Expand All @@ -346,31 +368,26 @@ impl TransactionProcessor {
payload,
}: TransactionRequest,
) {
let setting = backoff::ExponentialBackoffBuilder::new()
.with_initial_interval(Duration::from_secs(6))
.with_randomization_factor(0.2)
.with_multiplier(2.0)
.with_max_elapsed_time(Some(Duration::from_secs(120)))
.build();

let signer = if external_id.is_some() {
keypair.derive([DeriveJunction::soft(external_id)])
} else {
keypair
};

let nonce_value = nonce.pull();
let value = nonce_value.0.clone();
// let nonce_value = nonce.pull();
// let value = nonce_value.0.clone();
// let value = 0;
let block_header = block_subscription.get_block_header();
// tracing::info!("Pulling nonce {} for transaction {}", value, request_id);

let res = backoff::future::retry(setting, || async {
let res = backoff::future::retry(Self::default_backoff(), || async {
match Self::submit_and_watch(
platform_client.clone(),
platform_url.clone(),
platform_token.clone(),
Arc::clone(&chain_client),
signer.clone(),
value,
Arc::clone(&nonce),
request_id,
payload.clone(),
block_header.clone(),
Expand All @@ -379,11 +396,20 @@ impl TransactionProcessor {
{
Ok(hash) => Ok(hash),
Err(e) => {
// Few possible errors
// ServerError(1010) - Invalid Transaction - Transaction is outdated
// ServerError(1012) - Transaction is temporally banned
// ServerError(1013) - Transaction already imported
// ServerError(1014) - Priority is too low
// We will reset the nonce if any error occurs
let mut nonce = nonce.lock().unwrap();
*nonce = 0;

tracing::info!("Resetting cache nonce: 0");
tracing::error!(
"Error submitting transaction #{} from account {} with nonce {}. Payload: 0x{}",
"Error submitting transaction #{} from account {} payload: 0x{}",
request_id,
trim_account(hex::encode(signer.public_key().0)),
value,
hex::encode(payload.clone())
);
tracing::error!("{:?}", e);
Expand Down Expand Up @@ -446,6 +472,16 @@ impl TransactionProcessor {
}
}

fn default_backoff() -> ExponentialBackoff<SystemClock> {
let setting = backoff::ExponentialBackoffBuilder::new()
.with_initial_interval(Duration::from_secs(6))
.with_randomization_factor(0.2)
.with_multiplier(2.0)
.with_max_elapsed_time(Some(Duration::from_secs(120)))
.build();
setting
}

async fn get_initial_nonce(&self) -> u64 {
loop {
tracing::info!("Waiting for 2 blocks to get the initial nonce");
Expand All @@ -465,8 +501,8 @@ impl TransactionProcessor {
async fn launch_job_scheduler(mut self) {
// TODO: Change this as we can have many accounts that can have diff nonces
let initial_nonce = self.get_initial_nonce().await;
let nonce_tracker = Arc::new(Mutex::new(initial_nonce));

let nonce_tracker = Arc::new(Nonce(initial_nonce).init_from());
tracing::info!(
"Setting initial nonce to {} for account {}",
initial_nonce,
Expand Down

0 comments on commit ea3d73f

Please sign in to comment.