#!/usr/bin/env python # # Licensed to the Apache Software Foundation (ASF) under one # or more contributor license agreements. See the NOTICE file # distributed with this work for additional information # regarding copyright ownership. The ASF licenses this file # to you under the Apache License, Version 2.0 (the # "License"); you may not use this file except in compliance # with the License. You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, # software distributed under the License is distributed on an # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY # KIND, either express or implied. See the License for the # specific language governing permissions and limitations # under the License. # import sys from optparse import OptionParser from cproton import * class Options(object): def __init__(self): parser = OptionParser(usage="usage: %prog [options] [ ...]") parser.add_option("-s", "--server", action="store", type="string", default="0.0.0.0:5672", metavar="", help="Address of the server with syntax: hostname | ip-address [:]") parser.add_option("-m", "--mailbox", action="store", type="string", default="mailbox", metavar="", help="Name of the mailbox on the server.") parser.add_option("-v", "--verbose", action="store_true", help="Turn on extra trace messages.") # SSL configuration parser.add_option("--ssl-cert-db", type="str", metavar="", help="database of trusted certificates. Enables use of SSL.") # if server wants client authentication: #parser.add_option("--ssl-cert-file") #parser.add_option("--ssl-key-file") #parser.add_option("--ssl-key-pw") opts, self.messages = parser.parse_args() # uses sys.argv[1:] self.server = opts.server self.mailbox = opts.mailbox self.ca_database = opts.ssl_cert_db addr = opts.server.rsplit(":", 1) self.host = addr[0] if len(addr) == 2: self.port = addr[1] else: self.port = "5672" self.verbose = opts.verbose class PostClient(object): def __init__(self, host, port, mailbox, ca_database=None): """ Initialize the client by supplying the address of the server, and the name of the mailbox to post to. """ self.host = host self.port = port self.mailbox = mailbox self.logging = False self.ca_database = ca_database self.ssl_client = None def setup(self): """ Setup and configure the connection to the server. """ # setup a driver connection to the server self.log("Connecting to server host = %s:%s" % (self.host, self.port)) self.driver = pn_driver(); self.cxtr = pn_connector(self.driver, self.host, self.port, None) assert(self.cxtr) # Enable SSL if database of trusted CAs given if self.ca_database: self.log("Using SSL, CA database = %s" % self.ca_database) transport = pn_connector_transport(self.cxtr); assert(transport); ssl_client = pn_ssl(transport) assert(ssl_client) rc = pn_ssl_set_trusted_ca_db(ssl_client, self.ca_database) assert(rc == 0) # we want to fail if the server's certificate is invalid: rc = pn_ssl_set_peer_authentication(ssl_client, PN_SSL_VERIFY_PEER, None) assert(rc == 0) # configure SASL self.sasl = pn_connector_sasl(self.cxtr) pn_sasl_mechanisms(self.sasl, "ANONYMOUS") pn_sasl_client(self.sasl) # inform the engine about the connection, and link the driver to it. self.conn = pn_connection() pn_connector_set_connection(self.cxtr, self.conn) # create a session, and Link for receiving from the mailbox self.log("Posting to mailbox = %s" % self.mailbox) self.ssn = pn_session(self.conn) self.link = pn_sender(self.ssn, "sender") dst = pn_link_target(self.link) pn_terminus_set_address(dst, self.mailbox) # now open all the engine endpoints pn_connection_open(self.conn) pn_session_open(self.ssn) pn_link_open(self.link) def teardown(self): """ Perform a clean disconnect from the server, and release the resources created in setup() """ self.log("Shutting down the connection cleanly...") pn_connection_close(self.conn) # now wait for the connector to close while not pn_connector_closed(self.cxtr): self.wait() #pn_sasl_free(self.sasl); pn_link_free(self.link); pn_session_free(self.ssn); pn_connection_free(self.conn); pn_connector_free(self.cxtr); self.log("...Shutdown complete!") def wait(self): """ Wait for an event to process. """ self.log("Waiting for events...") # prepare pending outbound data for the network pn_connector_process(self.cxtr) # wait forever for network event(s) pn_driver_wait(self.driver, -1) # process any data that arrived pn_connector_process(self.cxtr) self.log("...waiting done!") def settle(self): """ In order to be sure that the remote has accepted the message, we need to wait until the message's delivery has been remotely settled. Once that occurs, we can release the delivery by settling it. """ d = pn_unsettled_head(self.link) while d: _next = pn_unsettled_next(d) # if the remote has either settled this delivery OR set the # disposition, we consider the message received. disp = pn_delivery_remote_state(d) if disp and disp != PN_ACCEPTED: print("Warning: message was not accepted by the remote!") if disp or pn_delivery_settled(d): pn_delivery_settle(d) d = _next def closed(self): return self.cxtr == None or pn_connector_closed(self.cxtr) def enableLogging(self): self.logging = True def log(self, msg): if self.logging: print("%s" % msg) ################################################## ################################################## ################################################## def main(): options = Options() if len(options.messages) == 0: print("No message data given!") return -1 sender = PostClient(options.host, options.port, options.mailbox, options.ca_database) if options.verbose: sender.enableLogging() sender.setup() # wait until we authenticate with the server while pn_sasl_state(sender.sasl) not in (PN_SASL_PASS, PN_SASL_FAIL): sender.wait() if sender.closed(): sender.log("connection failed") return -1; if pn_sasl_state(sender.sasl) == PN_SASL_FAIL: print("Error: Authentication failure") return -1 # main loop: send each message pendingSends = list(options.messages) while pendingSends: # wait until the server grants us some send credit if pn_link_credit(sender.link) == 0: sender.log("wait for credit") sender.wait() while pn_link_credit(sender.link) > 0 and not sender.closed(): msg = pendingSends.pop(0) sender.log("sending %s" % msg) d = pn_delivery(sender.link, "post-delivery-%s" % len(pendingSends)) rc = pn_link_send(sender.link, msg) if (rc < 0): print("Error: sending message: %s" % rc) return -2 assert rc == len(msg) pn_link_advance(sender.link) # deliver the message # settle any deliveries that the server has accepted sender.settle() # done sending, now block until any pending deliveries are settled sender.log("Done sending messages, waiting for deliveries to settle..."); while pn_link_unsettled(sender.link) > 0 and not sender.closed(): sender.wait() sender.settle() # We're done - now clean up the connection: sender.teardown() return 0 if __name__ == "__main__": sys.exit(main())