diff --git a/checkstyle.xml b/checkstyle.xml
index ca3401e..d479b6b 100644
--- a/checkstyle.xml
+++ b/checkstyle.xml
@@ -99,9 +99,15 @@
-
-
-
+
+
+
+
+
+
+
+
+
@@ -176,7 +182,9 @@
-
+
+
+
@@ -185,7 +193,7 @@
-
+
diff --git a/pom.xml b/pom.xml
index 775d414..b9a29a1 100644
--- a/pom.xml
+++ b/pom.xml
@@ -10,7 +10,7 @@
ai.preferred
venom
- 4.2.4
+ 4.2.5
jar
${project.groupId}:${project.artifactId}
diff --git a/src/main/java/ai/preferred/venom/fetcher/AsyncFetcher.java b/src/main/java/ai/preferred/venom/fetcher/AsyncFetcher.java
index f00d7aa..215e21b 100644
--- a/src/main/java/ai/preferred/venom/fetcher/AsyncFetcher.java
+++ b/src/main/java/ai/preferred/venom/fetcher/AsyncFetcher.java
@@ -21,6 +21,9 @@
import ai.preferred.venom.request.HttpFetcherRequest;
import ai.preferred.venom.request.Request;
import ai.preferred.venom.response.Response;
+import ai.preferred.venom.socks.SocksConnectingIOReactor;
+import ai.preferred.venom.socks.SocksHttpRoutePlanner;
+import ai.preferred.venom.socks.SocksIOSessionStrategy;
import ai.preferred.venom.storage.FileManager;
import ai.preferred.venom.uagent.DefaultUserAgent;
import ai.preferred.venom.uagent.UserAgent;
@@ -42,11 +45,20 @@
import org.apache.http.client.utils.URIUtils;
import org.apache.http.concurrent.BasicFuture;
import org.apache.http.concurrent.FutureCallback;
+import org.apache.http.config.Registry;
+import org.apache.http.config.RegistryBuilder;
import org.apache.http.entity.ByteArrayEntity;
+import org.apache.http.impl.conn.DefaultRoutePlanner;
+import org.apache.http.impl.conn.DefaultSchemePortResolver;
import org.apache.http.impl.nio.client.CloseableHttpAsyncClient;
import org.apache.http.impl.nio.client.HttpAsyncClientBuilder;
+import org.apache.http.impl.nio.conn.PoolingNHttpClientConnectionManager;
import org.apache.http.impl.nio.reactor.IOReactorConfig;
import org.apache.http.nio.client.methods.HttpAsyncMethods;
+import org.apache.http.nio.conn.NoopIOSessionStrategy;
+import org.apache.http.nio.conn.SchemeIOSessionStrategy;
+import org.apache.http.nio.conn.ssl.SSLIOSessionStrategy;
+import org.apache.http.nio.reactor.IOReactorException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -186,13 +198,33 @@ private AsyncFetcher(final Builder builder) {
.build();
final HttpAsyncClientBuilder clientBuilder = HttpAsyncClientBuilder.create()
- .setDefaultIOReactorConfig(reactorConfig)
- .setThreadFactory(builder.threadFactory)
.setMaxConnPerRoute(builder.maxRouteConnections)
.setMaxConnTotal(builder.maxConnections)
.setSSLContext(builder.sslContext)
.setRedirectStrategy(builder.redirectStrategy);
+ if (builder.enableSocksProxy) {
+ final PoolingNHttpClientConnectionManager connectionManager;
+ try {
+ final SSLIOSessionStrategy sslioSessionStrategy = SSLIOSessionStrategy.getDefaultStrategy();
+ final Registry reg = RegistryBuilder.create()
+ .register("socks", new SocksIOSessionStrategy(sslioSessionStrategy))
+ .register("http", NoopIOSessionStrategy.INSTANCE)
+ .register("https", sslioSessionStrategy)
+ .build();
+
+ final SocksConnectingIOReactor reactor = new SocksConnectingIOReactor(reactorConfig, builder.threadFactory);
+ connectionManager = new PoolingNHttpClientConnectionManager(reactor, reg);
+ clientBuilder.setConnectionManager(connectionManager)
+ .setRoutePlanner(new SocksHttpRoutePlanner(new DefaultRoutePlanner(DefaultSchemePortResolver.INSTANCE)));
+ } catch (IOReactorException e) {
+ LOGGER.error("Disabling SOCKS protocol", e);
+ clientBuilder.setDefaultIOReactorConfig(reactorConfig).setThreadFactory(builder.threadFactory);
+ }
+ } else {
+ clientBuilder.setDefaultIOReactorConfig(reactorConfig).setThreadFactory(builder.threadFactory);
+ }
+
if (builder.maxConnections < builder.maxRouteConnections) {
clientBuilder.setMaxConnTotal(builder.maxRouteConnections);
LOGGER.info("Maximum total connections will be set to {}, to match maximum route connection.",
@@ -442,6 +474,8 @@ public static final class Builder {
*/
private final List callbacks;
+ private boolean enableSocksProxy;
+
/**
* Determines whether cookie storage is allowed.
*/
@@ -555,6 +589,17 @@ private Builder() {
connectTimeout = -1;
socketTimeout = -1;
compressed = true;
+ enableSocksProxy = false;
+ }
+
+ /**
+ * Enables SOCKS protocol for proxies (socks://). Experimental.
+ *
+ * @return this
+ */
+ public Builder enableSocksProxy() {
+ enableSocksProxy = true;
+ return this;
}
/**
diff --git a/src/main/java/ai/preferred/venom/socks/SocksConnectingIOReactor.java b/src/main/java/ai/preferred/venom/socks/SocksConnectingIOReactor.java
new file mode 100644
index 0000000..01ea9e2
--- /dev/null
+++ b/src/main/java/ai/preferred/venom/socks/SocksConnectingIOReactor.java
@@ -0,0 +1,53 @@
+package ai.preferred.venom.socks;
+
+import org.apache.http.impl.nio.reactor.DefaultConnectingIOReactor;
+import org.apache.http.impl.nio.reactor.IOReactorConfig;
+import org.apache.http.nio.reactor.IOEventDispatch;
+import org.apache.http.nio.reactor.IOReactorException;
+
+import java.io.InterruptedIOException;
+import java.util.concurrent.ThreadFactory;
+
+/**
+ * This IOReactor makes sure that the supplied {@link IOEventDispatch} is decorated with {@link SocksIOEventDispatch}.
+ */
+public class SocksConnectingIOReactor extends DefaultConnectingIOReactor {
+
+ /**
+ * Creates an instance of SocksConnectingIOReactor with the given configuration.
+ *
+ * @param config I/O reactor configuration.
+ * @param threadFactory the factory to create threads.
+ * Can be {@code null}.
+ * @throws IOReactorException in case if a non-recoverable I/O error.
+ */
+ public SocksConnectingIOReactor(IOReactorConfig config, ThreadFactory threadFactory) throws IOReactorException {
+ super(config, threadFactory);
+ }
+
+ /**
+ * Creates an instance of SocksConnectingIOReactor with the given configuration.
+ *
+ * @param config I/O reactor configuration.
+ * Can be {@code null}.
+ * @throws IOReactorException in case if a non-recoverable I/O error.
+ */
+ public SocksConnectingIOReactor(IOReactorConfig config) throws IOReactorException {
+ super(config);
+ }
+
+ /**
+ * Creates an instance of SocksConnectingIOReactor with default configuration.
+ *
+ * @throws IOReactorException in case if a non-recoverable I/O error.
+ */
+ public SocksConnectingIOReactor() throws IOReactorException {
+ super();
+ }
+
+ @Override
+ public void execute(final IOEventDispatch eventDispatch) throws InterruptedIOException, IOReactorException {
+ super.execute(new SocksIOEventDispatch(eventDispatch));
+ }
+
+}
diff --git a/src/main/java/ai/preferred/venom/socks/SocksHttpRoutePlanner.java b/src/main/java/ai/preferred/venom/socks/SocksHttpRoutePlanner.java
new file mode 100644
index 0000000..fda0a98
--- /dev/null
+++ b/src/main/java/ai/preferred/venom/socks/SocksHttpRoutePlanner.java
@@ -0,0 +1,40 @@
+package ai.preferred.venom.socks;
+
+import com.google.common.annotations.Beta;
+import org.apache.http.HttpException;
+import org.apache.http.HttpHost;
+import org.apache.http.HttpRequest;
+import org.apache.http.conn.routing.HttpRoute;
+import org.apache.http.conn.routing.HttpRoutePlanner;
+import org.apache.http.protocol.HttpContext;
+
+/**
+ * This route planners ensures that the connection to https server via socks proxy works. It prevents http client from
+ * tunnelling the IO session twice ({@link SocksIOSessionStrategy} upgrades {@link SocksIOSession} to
+ * {@link org.apache.http.nio.reactor.ssl.SSLIOSession} when necessary).
+ */
+@Beta
+public class SocksHttpRoutePlanner implements HttpRoutePlanner {
+
+ private final HttpRoutePlanner rp;
+
+ /**
+ * Decorates {@link HttpRoutePlanner}.
+ *
+ * @param rp decorated route planner
+ */
+ public SocksHttpRoutePlanner(final HttpRoutePlanner rp) {
+ this.rp = rp;
+ }
+
+ @Override
+ public HttpRoute determineRoute(HttpHost host, HttpRequest request, HttpContext context) throws HttpException {
+ final HttpRoute route = rp.determineRoute(host, request, context);
+ final boolean secure = "https".equalsIgnoreCase(route.getTargetHost().getSchemeName());
+ if (secure && route.getProxyHost() != null && "socks".equalsIgnoreCase(route.getProxyHost().getSchemeName())) {
+ return new HttpRoute(route.getTargetHost(), route.getLocalAddress(), route.getProxyHost(), false);
+ }
+ return route;
+ }
+
+}
diff --git a/src/main/java/ai/preferred/venom/socks/SocksIOEventDispatch.java b/src/main/java/ai/preferred/venom/socks/SocksIOEventDispatch.java
new file mode 100644
index 0000000..4be65d7
--- /dev/null
+++ b/src/main/java/ai/preferred/venom/socks/SocksIOEventDispatch.java
@@ -0,0 +1,118 @@
+package ai.preferred.venom.socks;
+
+import org.apache.http.nio.NHttpClientConnection;
+import org.apache.http.nio.NHttpClientEventHandler;
+import org.apache.http.nio.NHttpConnection;
+import org.apache.http.nio.protocol.HttpAsyncRequestExecutor;
+import org.apache.http.nio.reactor.IOEventDispatch;
+import org.apache.http.nio.reactor.IOSession;
+
+import java.io.IOException;
+
+/**
+ * This class wraps and handles IO dispatch related to {@link SocksIOSession}.
+ */
+public class SocksIOEventDispatch implements IOEventDispatch {
+
+ private final IOEventDispatch dispatch;
+
+ /**
+ * Decorates {@link IOEventDispatch}.
+ *
+ * @param dispatch delegated IO dispatch
+ */
+ public SocksIOEventDispatch(IOEventDispatch dispatch) {
+ this.dispatch = dispatch;
+ }
+
+ @Override
+ public void connected(IOSession session) {
+ dispatch.connected(session);
+ }
+
+ @Override
+ public void inputReady(IOSession session) {
+ try {
+ if (initializeSocksSession(session)) {
+ dispatch.inputReady(session);
+ }
+ } catch (RuntimeException e) {
+ session.shutdown();
+ throw e;
+ }
+ }
+
+ @Override
+ public void outputReady(IOSession session) {
+ try {
+ if (initializeSocksSession(session)) {
+ dispatch.outputReady(session);
+ }
+ } catch (RuntimeException e) {
+ session.shutdown();
+ throw e;
+ }
+ }
+
+ @Override
+ public void timeout(IOSession session) {
+ try {
+ dispatch.timeout(session);
+ final SocksIOSession socksIOSession = getSocksSession(session);
+ if (socksIOSession != null) {
+ socksIOSession.shutdown();
+ }
+ } catch (RuntimeException e) {
+ session.shutdown();
+ throw e;
+ }
+ }
+
+ @Override
+ public void disconnected(IOSession session) {
+ dispatch.disconnected(session);
+ }
+
+ private boolean initializeSocksSession(IOSession session) {
+ final SocksIOSession socksSession = getSocksSession(session);
+ if (socksSession != null) {
+ try {
+ try {
+ if (!socksSession.isInitialized()) {
+ return socksSession.initialize();
+ }
+ } catch (final IOException e) {
+ onException(socksSession, e);
+ throw new RuntimeException(e);
+ }
+ } catch (final RuntimeException e) {
+ socksSession.shutdown();
+ throw e;
+ }
+ }
+ return true;
+ }
+
+ private void onException(IOSession session, Exception ex) {
+ final NHttpClientConnection conn = getConnection(session);
+ if (conn != null) {
+ final NHttpClientEventHandler handler = getEventHandler(conn);
+ if (handler != null) {
+ handler.exception(conn, ex);
+ }
+ }
+ }
+
+ private SocksIOSession getSocksSession(IOSession session) {
+ return (SocksIOSession) session.getAttribute(SocksIOSession.SESSION_KEY);
+ }
+
+ private NHttpClientConnection getConnection(IOSession session) {
+ return (NHttpClientConnection) session.getAttribute(IOEventDispatch.CONNECTION_KEY);
+ }
+
+ private NHttpClientEventHandler getEventHandler(NHttpConnection conn) {
+ return (NHttpClientEventHandler) conn.getContext().getAttribute(HttpAsyncRequestExecutor.HTTP_HANDLER);
+ }
+
+}
diff --git a/src/main/java/ai/preferred/venom/socks/SocksIOSession.java b/src/main/java/ai/preferred/venom/socks/SocksIOSession.java
new file mode 100644
index 0000000..9621d55
--- /dev/null
+++ b/src/main/java/ai/preferred/venom/socks/SocksIOSession.java
@@ -0,0 +1,305 @@
+package ai.preferred.venom.socks;
+
+import org.apache.http.HttpHost;
+import org.apache.http.conn.routing.HttpRoute;
+import org.apache.http.conn.util.InetAddressUtils;
+import org.apache.http.nio.reactor.IOSession;
+import org.apache.http.nio.reactor.SessionBufferStatus;
+import org.apache.http.util.Asserts;
+
+import java.io.IOException;
+import java.net.Inet4Address;
+import java.net.InetSocketAddress;
+import java.net.SocketAddress;
+import java.nio.ByteBuffer;
+import java.nio.channels.ByteChannel;
+import java.nio.charset.StandardCharsets;
+
+/**
+ * The class establishes Socks4a connection and delegates the interface calls to a decorated {@link IOSession}.
+ */
+public class SocksIOSession implements IOSession {
+
+ /**
+ * SOCKS session key.
+ */
+ public static final String SESSION_KEY = "http.session.socks";
+
+ private static final String DEFAULT_USER_ID = "USER";
+
+ private static final byte SOCKS_VERSION = 4;
+ private static final byte CONNECT = 1;
+ private static final byte NULL = 0;
+
+ private static final byte[] DIST_IP_LOOKUP_REQUEST = new byte[]{0, 0, 0, 1};
+
+ private final IOSession innerSession;
+ private final HttpHost targetHost;
+ private final String userId;
+
+ private final SocketAddress remoteAddress;
+
+ private final ByteBuffer replyBuf = ByteBuffer.allocate(8);
+
+ private volatile int status = IOSession.ACTIVE;
+
+ private volatile boolean initialized;
+
+ private boolean connectRequested;
+ private boolean connectReceived;
+
+ private boolean endOfStream = false;
+
+ /**
+ * Decorates {@link IOSession}, sets default user ID for a SOCKS proxy.
+ *
+ * @param innerSession decorated session
+ */
+ public SocksIOSession(final IOSession innerSession) {
+ this(innerSession, DEFAULT_USER_ID);
+ }
+
+ /**
+ * Decorates {@link IOSession}, allows to specify user ID.
+ *
+ * @param innerSession decorated session
+ * @param userId user id as in SOCKS4a specification
+ */
+ public SocksIOSession(final IOSession innerSession, final String userId) {
+ final HttpRoute route = (HttpRoute) innerSession.getAttribute(IOSession.ATTACHMENT_KEY);
+
+ this.innerSession = innerSession;
+ this.targetHost = route.getTargetHost();
+ this.userId = userId;
+
+ if (targetHost.getAddress() != null) {
+ remoteAddress = new InetSocketAddress(targetHost.getAddress(), targetHost.getPort());
+ } else {
+ remoteAddress = InetSocketAddress.createUnresolved(targetHost.getHostName(), targetHost.getPort());
+ }
+
+ innerSession.setAttribute(SESSION_KEY, this);
+ }
+
+ /**
+ * Initializes Socks IO session.
+ *
+ * @return true if Socks IO session is successfully initialized, false - otherwise.
+ * @throws IOException session IO exceptions
+ */
+ synchronized boolean initialize() throws IOException {
+ Asserts.check(!this.initialized, "Socks I/O session already initialized");
+
+ if (innerSession.getStatus() >= IOSession.CLOSING) {
+ return false;
+ }
+
+ if (!connectRequested) {
+ sendConnectRequest();
+ }
+
+ if (!connectReceived) {
+ initialized = receiveConnectReply();
+ }
+
+ return initialized;
+ }
+
+ private void sendConnectRequest() throws IOException {
+ Asserts.check(!this.connectRequested, "Socks CONNECT already sent");
+
+ final boolean isIPv4 = InetAddressUtils.isIPv4Address(targetHost.getHostName());
+
+ final byte[] userId = this.userId.getBytes(StandardCharsets.ISO_8859_1);
+ final byte[] host = targetHost.getHostName().getBytes(StandardCharsets.ISO_8859_1);
+
+ final int size = 9 + userId.length + (isIPv4 ? 0 : host.length + 1);
+
+ final ByteBuffer buf = ByteBuffer.allocate(size);
+ buf.put(SOCKS_VERSION);
+ buf.put(CONNECT);
+ buf.put((byte) ((targetHost.getPort() >> 8) & 0xff));
+ buf.put((byte) (targetHost.getPort() & 0xff));
+ buf.put(isIPv4 ? Inet4Address.getByName(targetHost.getHostName()).getAddress() : DIST_IP_LOOKUP_REQUEST);
+ buf.put(userId);
+ buf.put(NULL);
+ if (!isIPv4) {
+ buf.put(host);
+ buf.put(NULL);
+ }
+ buf.flip();
+
+ if (innerSession.channel().write(buf) != size) {
+ throw new IOException("Could not flush the buffer");
+ }
+
+ connectRequested = true;
+ }
+
+ private boolean receiveConnectReply() throws IOException {
+ final int read = innerSession.channel().read(replyBuf);
+ if (!endOfStream && read == -1) {
+ endOfStream = true;
+ close();
+ throw new IOException("IO channel closed before connection established");
+ }
+
+ if (replyBuf.position() < 8) {
+ return false;
+ }
+
+ replyBuf.flip();
+
+ processConnectReply();
+
+ return true;
+ }
+
+ private void processConnectReply() throws IOException {
+ Asserts.check(!this.connectReceived, "CONNECT reply has been already received");
+ Asserts.check(replyBuf.limit() == 8, "Response is expected of 8 bytes, but got {}", replyBuf.limit());
+
+ byte vn = replyBuf.get();
+
+ Asserts.check(vn == 0 || vn == 4, "Invalid socks version {}", vn);
+
+ IOException ex = null;
+ final byte cd = replyBuf.get();
+ switch (cd) {
+ case 90:
+ break;
+ case 91:
+ ex = new IOException("SOCKS request rejected");
+ break;
+ case 92:
+ ex = new IOException("SOCKS server couldn't reach destination");
+ break;
+ case 93:
+ ex = new IOException("SOCKS authentication failed");
+ break;
+ default:
+ ex = new IOException("Reply from SOCKS server contains bad status");
+ break;
+ }
+
+ if (ex != null) {
+ close();
+ throw ex;
+ }
+
+ connectReceived = true;
+ }
+
+ /**
+ * Checks if the session has been initialized.
+ *
+ * @return true if Socks IO session is successfully initialized, false - otherwise.
+ */
+ boolean isInitialized() {
+ return initialized;
+ }
+
+ @Override
+ public ByteChannel channel() {
+ return innerSession.channel();
+ }
+
+ @Override
+ public SocketAddress getRemoteAddress() {
+ return remoteAddress;
+ }
+
+ @Override
+ public SocketAddress getLocalAddress() {
+ return innerSession.getLocalAddress();
+ }
+
+ @Override
+ public int getEventMask() {
+ return innerSession.getEventMask();
+ }
+
+ @Override
+ public void setEventMask(int ops) {
+ innerSession.setEventMask(ops);
+ }
+
+ @Override
+ public void setEvent(int op) {
+ innerSession.setEvent(op);
+ }
+
+ @Override
+ public void clearEvent(int op) {
+ innerSession.clearEvent(op);
+ }
+
+ @Override
+ public synchronized void close() {
+ if (status >= IOSession.CLOSING) {
+ return;
+ }
+ status = IOSession.CLOSED;
+ innerSession.close();
+ }
+
+ @Override
+ public synchronized void shutdown() {
+ if (status >= IOSession.CLOSING) {
+ return;
+ }
+ status = IOSession.CLOSED;
+ innerSession.shutdown();
+ }
+
+ @Override
+ public int getStatus() {
+ return status;
+ }
+
+ @Override
+ public boolean isClosed() {
+ return innerSession.isClosed();
+ }
+
+ @Override
+ public int getSocketTimeout() {
+ return innerSession.getSocketTimeout();
+ }
+
+ @Override
+ public void setSocketTimeout(int timeout) {
+ innerSession.setSocketTimeout(timeout);
+ }
+
+ @Override
+ public void setBufferStatus(SessionBufferStatus status) {
+ innerSession.setBufferStatus(status);
+ }
+
+ @Override
+ public boolean hasBufferedInput() {
+ return innerSession.hasBufferedInput();
+ }
+
+ @Override
+ public boolean hasBufferedOutput() {
+ return innerSession.hasBufferedOutput();
+ }
+
+ @Override
+ public void setAttribute(final String name, final Object obj) {
+ innerSession.setAttribute(name, obj);
+ }
+
+ @Override
+ public Object getAttribute(final String name) {
+ return innerSession.getAttribute(name);
+ }
+
+ @Override
+ public Object removeAttribute(final String name) {
+ return innerSession.removeAttribute(name);
+ }
+
+}
diff --git a/src/main/java/ai/preferred/venom/socks/SocksIOSessionStrategy.java b/src/main/java/ai/preferred/venom/socks/SocksIOSessionStrategy.java
new file mode 100644
index 0000000..025293f
--- /dev/null
+++ b/src/main/java/ai/preferred/venom/socks/SocksIOSessionStrategy.java
@@ -0,0 +1,47 @@
+package ai.preferred.venom.socks;
+
+import org.apache.http.HttpHost;
+import org.apache.http.conn.routing.HttpRoute;
+import org.apache.http.nio.conn.SchemeIOSessionStrategy;
+import org.apache.http.nio.conn.ssl.SSLIOSessionStrategy;
+import org.apache.http.nio.reactor.IOSession;
+import org.apache.http.nio.reactor.ssl.SSLIOSession;
+
+import java.io.IOException;
+
+/**
+ * Socks + TSL/SSL layering strategy.
+ */
+public class SocksIOSessionStrategy implements SchemeIOSessionStrategy {
+
+ private final SSLIOSessionStrategy sslioSessionStrategy;
+
+ /**
+ * @param sslioSessionStrategy TSL/SSL strategy
+ */
+ public SocksIOSessionStrategy(final SSLIOSessionStrategy sslioSessionStrategy) {
+ this.sslioSessionStrategy = sslioSessionStrategy;
+ }
+
+ @Override
+ public IOSession upgrade(final HttpHost host, final IOSession session) throws IOException {
+ final HttpRoute route = (HttpRoute) session.getAttribute(IOSession.ATTACHMENT_KEY);
+
+ final SocksIOSession socksSession = new SocksIOSession(session);
+ socksSession.initialize();
+
+ if ("https".equals(route.getTargetHost().getSchemeName())) {
+ final SSLIOSession wrappedSocksSession = sslioSessionStrategy.upgrade(route.getTargetHost(), socksSession);
+ wrappedSocksSession.setAttribute(SocksIOSession.SESSION_KEY, socksSession);
+ return wrappedSocksSession;
+ }
+
+ return socksSession;
+ }
+
+ @Override
+ public boolean isLayeringRequired() {
+ return true;
+ }
+
+}