import oracle.xml.parser.v2.*; import java.sql.*; import oracle.AQ.*; import java.io.*; import org.xml.sax.SAXException; import XMLQueueEmptyException; public class XMLQueue { private Connection conn = null; private AQSession sess = null; private AQQueue queue = null; // Constructing an XMLQueue "binds" its to a particular AQ Queue public XMLQueue(Connection conn, String schema,String queueName) throws AQException,SQLException { this.conn = conn; conn.setAutoCommit(false); // Create the AQ Driver AQOracleDriver driver = new AQOracleDriver(); // Create a new session to work in sess = AQDriverManager.createAQSession(conn); // Get a handle to the requested queue queue = sess.getQueue (schema,queueName); } // Enqueue an XMLDocument to the queue public void enqueue( XMLDocument xmldoc ) throws AQException,IOException, SQLException { ByteArrayOutputStream baos = new ByteArrayOutputStream(); // Print the XML document to serialize it as XML Markup xmldoc.print(baos); // Get the bytes to enqueue into the "Raw" message byte[] messageBytes = baos.toByteArray(); // Ask the queue to create an empty message AQMessage message = queue.createMessage(); // Get a hold of the Raw Payload "bay" to write the message into AQRawPayload payload = message.getRawPayload(); // Set the contents of the payload to be the bytes of our XML Message payload.setStream(messageBytes,messageBytes.length); // Send the new message on its way into the queue queue.enqueue(new AQEnqueueOption(),message); // Sign, seal, and deliver. conn.commit(); } // Dequeue and return an XMLDocument from the queue public XMLDocument dequeue(boolean wait) throws AQException, SQLException, XMLQueueEmptyException { AQDequeueOption dqOpt = new AQDequeueOption(); // If user asked NOT to wait, then set this flag in the Dequeue Options if (!wait) { dqOpt.setWaitTime(AQDequeueOption.WAIT_NONE); } AQMessage message = null; try { // Try to dequeue the message message = queue.dequeue(dqOpt); } catch (oracle.AQ.AQOracleSQLException aqx) { // If we get an error 25228 then queue was empty and we didnt want to wait if (java.lang.Math.abs(aqx.getErrorCode()) == 25228) { throw new XMLQueueEmptyException(); } } // Retrieve the Raw Payload "bay" from the message AQRawPayload payload = message.getRawPayload(); // Create an InputStream on the bytes in the message ByteArrayInputStream bais = new ByteArrayInputStream(payload.getBytes()); XMLDocument dequeuedDoc = null; try { // Parse the XML message dequeuedDoc = XMLHelper.parse(bais,null); } catch (Exception spe) { /* Ignore, doc will be null */ } // Finalize the transactional dequeue operation by committing conn.commit(); // Return the XMLDocument return dequeuedDoc; } } |