From 11590667444b47fdb00e5b55d676b4c73f4cc6a0 Mon Sep 17 00:00:00 2001 From: schulz2 Date: Mon, 25 Mar 2019 16:13:36 +0100 Subject: [PATCH 1/5] Add more trace points for JMS initialization --- .../ibm/streamsx/messaging/jms/JMSSink.java | 6 +++++ .../ibm/streamsx/messaging/jms/JMSSource.java | 5 ++++ .../messaging/jms/JmsClasspathUtil.java | 26 ++++++++++++++++--- 3 files changed, 33 insertions(+), 4 deletions(-) diff --git a/com.ibm.streamsx.messaging/impl/java/src/com/ibm/streamsx/messaging/jms/JMSSink.java b/com.ibm.streamsx.messaging/impl/java/src/com/ibm/streamsx/messaging/jms/JMSSink.java index c0e6f92..12749c0 100644 --- a/com.ibm.streamsx.messaging/impl/java/src/com/ibm/streamsx/messaging/jms/JMSSink.java +++ b/com.ibm.streamsx.messaging/impl/java/src/com/ibm/streamsx/messaging/jms/JMSSink.java @@ -573,12 +573,18 @@ public synchronized void initialize(OperatorContext context) tracer.log(TraceLevel.TRACE, "Begin initialize()"); //$NON-NLS-1$ + tracer.log(TraceLevel.TRACE, "Calling super class initialization"); //$NON-NLS-1$ super.initialize(context); + tracer.log(TraceLevel.TRACE, "Returned from super class initialization"); //$NON-NLS-1$ + JmsClasspathUtil.setupClassPaths(context); // set SSL system properties if(isSslConnection()) { + + tracer.log(TraceLevel.TRACE, "Setting up SSL connection"); //$NON-NLS-1$ + if(context.getParameterNames().contains("keyStore")) System.setProperty("javax.net.ssl.keyStore", getAbsolutePath(getKeyStore())); if(context.getParameterNames().contains("keyStorePassword")) diff --git a/com.ibm.streamsx.messaging/impl/java/src/com/ibm/streamsx/messaging/jms/JMSSource.java b/com.ibm.streamsx.messaging/impl/java/src/com/ibm/streamsx/messaging/jms/JMSSource.java index 7ebe86a..ea94225 100644 --- a/com.ibm.streamsx.messaging/impl/java/src/com/ibm/streamsx/messaging/jms/JMSSource.java +++ b/com.ibm.streamsx.messaging/impl/java/src/com/ibm/streamsx/messaging/jms/JMSSource.java @@ -727,7 +727,9 @@ public synchronized void initialize(OperatorContext context) tracer.log(TraceLevel.TRACE, "Begin initialize()"); //$NON-NLS-1$ + tracer.log(TraceLevel.TRACE, "Calling super class initialization"); //$NON-NLS-1$ super.initialize(context); + tracer.log(TraceLevel.TRACE, "Returned from super class initialization"); //$NON-NLS-1$ consistentRegionContext = context.getOptionalContext(ConsistentRegionContext.class); @@ -735,6 +737,9 @@ public synchronized void initialize(OperatorContext context) // set SSL system properties if(isSslConnection()) { + + tracer.log(TraceLevel.TRACE, "Setting up SSL connection"); //$NON-NLS-1$ + if(context.getParameterNames().contains("keyStore")) System.setProperty("javax.net.ssl.keyStore", getAbsolutePath(getKeyStore())); if(context.getParameterNames().contains("keyStorePassword")) diff --git a/com.ibm.streamsx.messaging/impl/java/src/com/ibm/streamsx/messaging/jms/JmsClasspathUtil.java b/com.ibm.streamsx.messaging/impl/java/src/com/ibm/streamsx/messaging/jms/JmsClasspathUtil.java index 2577369..85de3c8 100644 --- a/com.ibm.streamsx.messaging/impl/java/src/com/ibm/streamsx/messaging/jms/JmsClasspathUtil.java +++ b/com.ibm.streamsx.messaging/impl/java/src/com/ibm/streamsx/messaging/jms/JmsClasspathUtil.java @@ -5,32 +5,50 @@ package com.ibm.streamsx.messaging.jms; import java.net.MalformedURLException; +import java.util.logging.Logger; import com.ibm.streams.operator.OperatorContext; +import com.ibm.streams.operator.logging.TraceLevel; public class JmsClasspathUtil { + private static final String CLASS_NAME = "com.ibm.streamsx.messaging.jms.JmsClasspathUtil"; //$NON-NLS-1$ + private static final Logger tracer = Logger.getLogger(CLASS_NAME); + public static void setupClassPaths(OperatorContext context) { + + tracer.log(TraceLevel.TRACE, "Setting up classpath"); //$NON-NLS-1$ + String AMQ_HOME = System.getenv("STREAMS_MESSAGING_AMQ_HOME"); //$NON-NLS-1$ if (AMQ_HOME != null) { + + tracer.log(TraceLevel.TRACE, "Apache Active MQ classpath!"); //$NON-NLS-1$ + String lib = AMQ_HOME + "/lib/*"; //$NON-NLS-1$ String libOptional = AMQ_HOME + "/lib/optional/*"; //$NON-NLS-1$ try { + tracer.log(TraceLevel.TRACE, "Adding class libs to context"); //$NON-NLS-1$ context.addClassLibraries(new String[] { lib, libOptional }); - } catch (MalformedURLException e) { - + } + catch (MalformedURLException e) { + tracer.log(TraceLevel.ERROR, "Failed to add class libs to context: " + e.getMessage()); //$NON-NLS-1$ } } String WMQ_HOME = System.getenv("STREAMS_MESSAGING_WMQ_HOME"); //$NON-NLS-1$ if (WMQ_HOME != null) { + + tracer.log(TraceLevel.TRACE, "IBM Websphere MQ classpath!"); //$NON-NLS-1$ + String javaLib = WMQ_HOME + "/java/lib/*"; //$NON-NLS-1$ try { + tracer.log(TraceLevel.TRACE, "Adding class libs to context"); //$NON-NLS-1$ context.addClassLibraries(new String[] { javaLib }); - } catch (MalformedURLException e) { - + } + catch (MalformedURLException e) { + tracer.log(TraceLevel.ERROR, "Failed to add class libs to context: " + e.getMessage()); //$NON-NLS-1$ } } } From ca54e657898ba3d9baa0756604b1147bf5bb22fc Mon Sep 17 00:00:00 2001 From: schulz2 Date: Mon, 25 Mar 2019 16:40:01 +0100 Subject: [PATCH 2/5] Add even more trace points for JMS initialization --- .../ibm/streamsx/messaging/jms/JmsClasspathUtil.java | 11 +++++++++++ 1 file changed, 11 insertions(+) diff --git a/com.ibm.streamsx.messaging/impl/java/src/com/ibm/streamsx/messaging/jms/JmsClasspathUtil.java b/com.ibm.streamsx.messaging/impl/java/src/com/ibm/streamsx/messaging/jms/JmsClasspathUtil.java index 85de3c8..a9357d3 100644 --- a/com.ibm.streamsx.messaging/impl/java/src/com/ibm/streamsx/messaging/jms/JmsClasspathUtil.java +++ b/com.ibm.streamsx.messaging/impl/java/src/com/ibm/streamsx/messaging/jms/JmsClasspathUtil.java @@ -20,6 +20,8 @@ public static void setupClassPaths(OperatorContext context) { tracer.log(TraceLevel.TRACE, "Setting up classpath"); //$NON-NLS-1$ + boolean classpathSet = false; + String AMQ_HOME = System.getenv("STREAMS_MESSAGING_AMQ_HOME"); //$NON-NLS-1$ if (AMQ_HOME != null) { @@ -35,6 +37,7 @@ public static void setupClassPaths(OperatorContext context) { catch (MalformedURLException e) { tracer.log(TraceLevel.ERROR, "Failed to add class libs to context: " + e.getMessage()); //$NON-NLS-1$ } + classpathSet = true; } String WMQ_HOME = System.getenv("STREAMS_MESSAGING_WMQ_HOME"); //$NON-NLS-1$ @@ -50,7 +53,15 @@ public static void setupClassPaths(OperatorContext context) { catch (MalformedURLException e) { tracer.log(TraceLevel.ERROR, "Failed to add class libs to context: " + e.getMessage()); //$NON-NLS-1$ } + classpathSet = true; + } + + if( classpathSet != true ) { + tracer.log(TraceLevel.ERROR, "No classpath has been set!"); //$NON-NLS-1$ } + + tracer.log(TraceLevel.TRACE, "Finished etting up classpath!"); //$NON-NLS-1$ + } } \ No newline at end of file From ecdffef9afd62f77ea319babc9f229ea6714ea6b Mon Sep 17 00:00:00 2001 From: schulz2 Date: Mon, 25 Mar 2019 17:15:28 +0100 Subject: [PATCH 3/5] Dump the initialization's environment --- .../streamsx/messaging/jms/JmsClasspathUtil.java | 16 +++++++++++++++- 1 file changed, 15 insertions(+), 1 deletion(-) diff --git a/com.ibm.streamsx.messaging/impl/java/src/com/ibm/streamsx/messaging/jms/JmsClasspathUtil.java b/com.ibm.streamsx.messaging/impl/java/src/com/ibm/streamsx/messaging/jms/JmsClasspathUtil.java index a9357d3..9a75a77 100644 --- a/com.ibm.streamsx.messaging/impl/java/src/com/ibm/streamsx/messaging/jms/JmsClasspathUtil.java +++ b/com.ibm.streamsx.messaging/impl/java/src/com/ibm/streamsx/messaging/jms/JmsClasspathUtil.java @@ -5,6 +5,8 @@ package com.ibm.streamsx.messaging.jms; import java.net.MalformedURLException; +import java.util.Map; +import java.util.Set; import java.util.logging.Logger; import com.ibm.streams.operator.OperatorContext; @@ -21,6 +23,18 @@ public static void setupClassPaths(OperatorContext context) { tracer.log(TraceLevel.TRACE, "Setting up classpath"); //$NON-NLS-1$ boolean classpathSet = false; + + + // Dump the provided environment + Map sysEnvMap = System.getenv(); + Set sysEnvKeys = sysEnvMap.keySet(); + tracer.log(TraceLevel.TRACE, "------------------------------------------------------------------------------------"); //$NON-NLS-1$ + tracer.log(TraceLevel.TRACE, "--- System Environment used during initialization"); //$NON-NLS-1$ + for( String key : sysEnvKeys) { + tracer.log(TraceLevel.TRACE, key + " = " + System.getenv(key)); //$NON-NLS-1$ + } + tracer.log(TraceLevel.TRACE, "------------------------------------------------------------------------------------"); //$NON-NLS-1$ + String AMQ_HOME = System.getenv("STREAMS_MESSAGING_AMQ_HOME"); //$NON-NLS-1$ if (AMQ_HOME != null) { @@ -60,7 +74,7 @@ public static void setupClassPaths(OperatorContext context) { tracer.log(TraceLevel.ERROR, "No classpath has been set!"); //$NON-NLS-1$ } - tracer.log(TraceLevel.TRACE, "Finished etting up classpath!"); //$NON-NLS-1$ + tracer.log(TraceLevel.TRACE, "Finished setting up classpath!"); //$NON-NLS-1$ } From 1102a1718a3d9b0b600eb06aac23464085499c00 Mon Sep 17 00:00:00 2001 From: schulz2 Date: Thu, 28 Mar 2019 14:39:18 +0100 Subject: [PATCH 4/5] Return (simpler) queue/topic name for destination objects (issue #354) For "Destination" header fields (JMSDestination, JMSReplyTo) now the name of the queue or topic is returned instead of a string representation of a destination object. Expect to see "MapQueue" now instead of "queue://MapQueue". --- .../ibm/streamsx/messaging/jms/JMSSource.java | 39 ++++++++++++++++++- 1 file changed, 37 insertions(+), 2 deletions(-) diff --git a/com.ibm.streamsx.messaging/impl/java/src/com/ibm/streamsx/messaging/jms/JMSSource.java b/com.ibm.streamsx.messaging/impl/java/src/com/ibm/streamsx/messaging/jms/JMSSource.java index ea94225..c965395 100644 --- a/com.ibm.streamsx.messaging/impl/java/src/com/ibm/streamsx/messaging/jms/JMSSource.java +++ b/com.ibm.streamsx.messaging/impl/java/src/com/ibm/streamsx/messaging/jms/JMSSource.java @@ -15,8 +15,11 @@ import java.util.List; import java.util.logging.Logger; +import javax.jms.Destination; import javax.jms.JMSException; import javax.jms.Message; +import javax.jms.Queue; +import javax.jms.Topic; import javax.naming.NamingException; import javax.xml.parsers.ParserConfigurationException; @@ -1115,7 +1118,7 @@ private void handleJmsHeaderValues(Message msg, OutputTuple outTuple) throws JMS jmsDestinationAttrIdx = streamSchema.getAttributeIndex(this.getJmsDestinationOutAttrName()); } if(jmsDestinationAttrIdx != -1 && msg.getJMSDestination() != null) { - outTuple.setObject(jmsDestinationAttrIdx, new RString(msg.getJMSDestination().toString())); + outTuple.setObject(jmsDestinationAttrIdx, new RString(getDestinationName(msg.getJMSDestination()))); } if(this.getJmsDeliveryModeOutAttrName() != null) { @@ -1164,7 +1167,7 @@ private void handleJmsHeaderValues(Message msg, OutputTuple outTuple) throws JMS jmsReplyToAttrIdx = streamSchema.getAttributeIndex(this.getJmsReplyToOutAttrName()); } if(jmsReplyToAttrIdx != -1 && msg.getJMSReplyTo() != null) { - outTuple.setObject(jmsReplyToAttrIdx, new RString(msg.getJMSReplyTo().toString())); + outTuple.setObject(jmsReplyToAttrIdx, new RString(getDestinationName(msg.getJMSReplyTo()))); } if(this.getJmsTypeOutAttrName() != null) { @@ -1182,7 +1185,39 @@ private void handleJmsHeaderValues(Message msg, OutputTuple outTuple) throws JMS } } + + /** + * Determines and returns the destination name + * + * @param destination The destination to determine the name for. + * @return The name of a Queue or Topic, or a string representation of the destination object. + * @throws JMSException + */ + private String getDestinationName(Destination destination) throws JMSException { + if (destination instanceof Queue) return ((Queue)destination).getQueueName(); + if (destination instanceof Topic) return ((Topic)destination).getTopicName(); + return destination.toString(); + } + + + /** + * Handles the property values of the current message. + * + * @param msg The current JMS message. + * @param outTuple The output tuple. + * @throws JMSException + */ +// private void handleJmsMessagePropertyValues(Message msg, OutputTuple outTuple) throws JMSException { +// Enumeration propertyNames = msg.getPropertyNames(); +// +// while (propertyNames.hasMoreElements()) { +// String name = propertyNames.nextElement(); +// +// +// } +// } + // Send the error message on to the error output port if one is specified private void sendOutputErrorMsg(String errorMessage) { OutputTuple errorTuple = errorOutputPort.newTuple(); From b552770bdabe3c2d4a9a31d6cc2af6ec9339d4fe Mon Sep 17 00:00:00 2001 From: schulz2 Date: Thu, 28 Mar 2019 14:46:40 +0100 Subject: [PATCH 5/5] Version bump v5.4.0 -> v5.4.1 (issue #354) --- com.ibm.streamsx.messaging/info.xml | 2 +- com.ibm.streamsx.messaging/pom.xml | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/com.ibm.streamsx.messaging/info.xml b/com.ibm.streamsx.messaging/info.xml index 536e111..c663ebf 100644 --- a/com.ibm.streamsx.messaging/info.xml +++ b/com.ibm.streamsx.messaging/info.xml @@ -684,7 +684,7 @@ The <attribute> element has three possible attributes: * composite types * xml - 5.4.0 + 5.4.1 4.2.0.0 diff --git a/com.ibm.streamsx.messaging/pom.xml b/com.ibm.streamsx.messaging/pom.xml index a309b3a..066c405 100644 --- a/com.ibm.streamsx.messaging/pom.xml +++ b/com.ibm.streamsx.messaging/pom.xml @@ -6,7 +6,7 @@ com.ibm.streamsx.messaging streamsx.messaging jar - 5.4.0 + 5.4.1 com.ibm.streamsx.messaging