Skip to content

Commit

Permalink
Merge pull request #357 from IBMStreams/develop
Browse files Browse the repository at this point in the history
Merge latest changes from develop to master
  • Loading branch information
schubon committed Apr 4, 2019
2 parents 7ddeead + b552770 commit 7602bd2
Show file tree
Hide file tree
Showing 5 changed files with 97 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -573,12 +573,18 @@ public synchronized void initialize(OperatorContext context)

tracer.log(TraceLevel.TRACE, "Begin initialize()"); //$NON-NLS-1$

tracer.log(TraceLevel.TRACE, "Calling super class initialization"); //$NON-NLS-1$
super.initialize(context);
tracer.log(TraceLevel.TRACE, "Returned from super class initialization"); //$NON-NLS-1$


JmsClasspathUtil.setupClassPaths(context);

// set SSL system properties
if(isSslConnection()) {

tracer.log(TraceLevel.TRACE, "Setting up SSL connection"); //$NON-NLS-1$

if(context.getParameterNames().contains("keyStore"))
System.setProperty("javax.net.ssl.keyStore", getAbsolutePath(getKeyStore()));
if(context.getParameterNames().contains("keyStorePassword"))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,11 @@
import java.util.List;
import java.util.logging.Logger;

import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.Queue;
import javax.jms.Topic;
import javax.naming.NamingException;
import javax.xml.parsers.ParserConfigurationException;

Expand Down Expand Up @@ -727,14 +730,19 @@ public synchronized void initialize(OperatorContext context)

tracer.log(TraceLevel.TRACE, "Begin initialize()"); //$NON-NLS-1$

tracer.log(TraceLevel.TRACE, "Calling super class initialization"); //$NON-NLS-1$
super.initialize(context);
tracer.log(TraceLevel.TRACE, "Returned from super class initialization"); //$NON-NLS-1$

consistentRegionContext = context.getOptionalContext(ConsistentRegionContext.class);

JmsClasspathUtil.setupClassPaths(context);

// set SSL system properties
if(isSslConnection()) {

tracer.log(TraceLevel.TRACE, "Setting up SSL connection"); //$NON-NLS-1$

if(context.getParameterNames().contains("keyStore"))
System.setProperty("javax.net.ssl.keyStore", getAbsolutePath(getKeyStore()));
if(context.getParameterNames().contains("keyStorePassword"))
Expand Down Expand Up @@ -1110,7 +1118,7 @@ private void handleJmsHeaderValues(Message msg, OutputTuple outTuple) throws JMS
jmsDestinationAttrIdx = streamSchema.getAttributeIndex(this.getJmsDestinationOutAttrName());
}
if(jmsDestinationAttrIdx != -1 && msg.getJMSDestination() != null) {
outTuple.setObject(jmsDestinationAttrIdx, new RString(msg.getJMSDestination().toString()));
outTuple.setObject(jmsDestinationAttrIdx, new RString(getDestinationName(msg.getJMSDestination())));
}

if(this.getJmsDeliveryModeOutAttrName() != null) {
Expand Down Expand Up @@ -1159,7 +1167,7 @@ private void handleJmsHeaderValues(Message msg, OutputTuple outTuple) throws JMS
jmsReplyToAttrIdx = streamSchema.getAttributeIndex(this.getJmsReplyToOutAttrName());
}
if(jmsReplyToAttrIdx != -1 && msg.getJMSReplyTo() != null) {
outTuple.setObject(jmsReplyToAttrIdx, new RString(msg.getJMSReplyTo().toString()));
outTuple.setObject(jmsReplyToAttrIdx, new RString(getDestinationName(msg.getJMSReplyTo())));
}

if(this.getJmsTypeOutAttrName() != null) {
Expand All @@ -1177,7 +1185,39 @@ private void handleJmsHeaderValues(Message msg, OutputTuple outTuple) throws JMS
}
}


/**
* Determines and returns the destination name
*
* @param destination The destination to determine the name for.
* @return The name of a Queue or Topic, or a string representation of the destination object.
* @throws JMSException
*/
private String getDestinationName(Destination destination) throws JMSException {
if (destination instanceof Queue) return ((Queue)destination).getQueueName();
if (destination instanceof Topic) return ((Topic)destination).getTopicName();
return destination.toString();
}


/**
* Handles the property values of the current message.
*
* @param msg The current JMS message.
* @param outTuple The output tuple.
* @throws JMSException
*/
// private void handleJmsMessagePropertyValues(Message msg, OutputTuple outTuple) throws JMSException {
// Enumeration<String> propertyNames = msg.getPropertyNames();
//
// while (propertyNames.hasMoreElements()) {
// String name = propertyNames.nextElement();
//
//
// }
// }


// Send the error message on to the error output port if one is specified
private void sendOutputErrorMsg(String errorMessage) {
OutputTuple errorTuple = errorOutputPort.newTuple();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,34 +5,77 @@
package com.ibm.streamsx.messaging.jms;

import java.net.MalformedURLException;
import java.util.Map;
import java.util.Set;
import java.util.logging.Logger;

import com.ibm.streams.operator.OperatorContext;
import com.ibm.streams.operator.logging.TraceLevel;

public class JmsClasspathUtil {

private static final String CLASS_NAME = "com.ibm.streamsx.messaging.jms.JmsClasspathUtil"; //$NON-NLS-1$
private static final Logger tracer = Logger.getLogger(CLASS_NAME);


public static void setupClassPaths(OperatorContext context) {

tracer.log(TraceLevel.TRACE, "Setting up classpath"); //$NON-NLS-1$

boolean classpathSet = false;


// Dump the provided environment
Map<String,String> sysEnvMap = System.getenv();
Set<String> sysEnvKeys = sysEnvMap.keySet();
tracer.log(TraceLevel.TRACE, "------------------------------------------------------------------------------------"); //$NON-NLS-1$
tracer.log(TraceLevel.TRACE, "--- System Environment used during initialization"); //$NON-NLS-1$
for( String key : sysEnvKeys) {
tracer.log(TraceLevel.TRACE, key + " = " + System.getenv(key)); //$NON-NLS-1$
}
tracer.log(TraceLevel.TRACE, "------------------------------------------------------------------------------------"); //$NON-NLS-1$


String AMQ_HOME = System.getenv("STREAMS_MESSAGING_AMQ_HOME"); //$NON-NLS-1$
if (AMQ_HOME != null) {

tracer.log(TraceLevel.TRACE, "Apache Active MQ classpath!"); //$NON-NLS-1$

String lib = AMQ_HOME + "/lib/*"; //$NON-NLS-1$
String libOptional = AMQ_HOME + "/lib/optional/*"; //$NON-NLS-1$

try {
tracer.log(TraceLevel.TRACE, "Adding class libs to context"); //$NON-NLS-1$
context.addClassLibraries(new String[] { lib, libOptional });
} catch (MalformedURLException e) {

}
catch (MalformedURLException e) {
tracer.log(TraceLevel.ERROR, "Failed to add class libs to context: " + e.getMessage()); //$NON-NLS-1$
}
classpathSet = true;
}

String WMQ_HOME = System.getenv("STREAMS_MESSAGING_WMQ_HOME"); //$NON-NLS-1$
if (WMQ_HOME != null) {

tracer.log(TraceLevel.TRACE, "IBM Websphere MQ classpath!"); //$NON-NLS-1$

String javaLib = WMQ_HOME + "/java/lib/*"; //$NON-NLS-1$
try {
tracer.log(TraceLevel.TRACE, "Adding class libs to context"); //$NON-NLS-1$
context.addClassLibraries(new String[] { javaLib });
} catch (MalformedURLException e) {

}
catch (MalformedURLException e) {
tracer.log(TraceLevel.ERROR, "Failed to add class libs to context: " + e.getMessage()); //$NON-NLS-1$
}
classpathSet = true;
}

if( classpathSet != true ) {
tracer.log(TraceLevel.ERROR, "No classpath has been set!"); //$NON-NLS-1$
}

tracer.log(TraceLevel.TRACE, "Finished setting up classpath!"); //$NON-NLS-1$

}

}
2 changes: 1 addition & 1 deletion com.ibm.streamsx.messaging/info.xml
Original file line number Diff line number Diff line change
Expand Up @@ -684,7 +684,7 @@ The &lt;attribute> element has three possible attributes:
* composite types
* xml
</info:description>
<info:version>5.4.0</info:version>
<info:version>5.4.1</info:version>
<info:requiredProductVersion>4.2.0.0</info:requiredProductVersion>
</info:identity>
<info:dependencies/>
Expand Down
2 changes: 1 addition & 1 deletion com.ibm.streamsx.messaging/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
<groupId>com.ibm.streamsx.messaging</groupId>
<artifactId>streamsx.messaging</artifactId>
<packaging>jar</packaging>
<version>5.4.0</version>
<version>5.4.1</version>
<name>com.ibm.streamsx.messaging</name>
<repositories>
<repository>
Expand Down

0 comments on commit 7602bd2

Please sign in to comment.