svn commit: r589629 [4/7] - in /activemq/activemq-dotnet/trunk: ./ src/main/csharp/ActiveMQ/ src/main/csharp/ActiveMQ/Commands/ src/main/csharp/ActiveMQ/OpenWire/ src/main/csharp/ActiveMQ/OpenWire/V1/ src/main/csharp/ActiveMQ/OpenWire/V2/ src/main/csha...

classic Classic list List threaded Threaded
1 message Options
Reply | Threaded
Open this post in threaded view
|

svn commit: r589629 [4/7] - in /activemq/activemq-dotnet/trunk: ./ src/main/csharp/ActiveMQ/ src/main/csharp/ActiveMQ/Commands/ src/main/csharp/ActiveMQ/OpenWire/ src/main/csharp/ActiveMQ/OpenWire/V1/ src/main/csharp/ActiveMQ/OpenWire/V2/ src/main/csha...

chirino-2
Modified: activemq/activemq-dotnet/trunk/src/main/csharp/ActiveMQ/Session.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/trunk/src/main/csharp/ActiveMQ/Session.cs?rev=589629&r1=589628&r2=589629&view=diff
==============================================================================
--- activemq/activemq-dotnet/trunk/src/main/csharp/ActiveMQ/Session.cs (original)
+++ activemq/activemq-dotnet/trunk/src/main/csharp/ActiveMQ/Session.cs Mon Oct 29 06:55:09 2007
@@ -14,475 +14,521 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-using ActiveMQ.Commands;
-using NMS;
 using System;
 using System.Collections;
+using Apache.ActiveMQ.Commands;
+using Apache.NMS;
 
-namespace ActiveMQ {
-        /// <summary>
-        /// Default provider of ISession
-        /// </summary>
-        public class Session : ISession
-        {
-                private Connection connection;
-                private SessionInfo info;
-                private AcknowledgementMode acknowledgementMode;
-                private long consumerCounter;
-                private long producerCounter;
-                private int prefetchSize = 1000;
-                private int maximumPendingMessageLimit;
-                private byte priority;
-                private bool dispatchAsync;
-                private bool exclusive;
-                private bool retroactive;
- private bool asyncSend;
-                private IDictionary consumers = Hashtable.Synchronized(new Hashtable());
-                private TransactionContext transactionContext;
-                private DispatchingThread dispatchingThread;
-                
-                public Session(Connection connection, SessionInfo info, AcknowledgementMode acknowledgementMode)
-                {
-                        this.connection = connection;
-                        this.info = info;
-                        this.acknowledgementMode = acknowledgementMode;
- this.asyncSend = connection.AsyncSend;
-                        transactionContext = new TransactionContext(this);
-                        dispatchingThread = new DispatchingThread(new DispatchingThread.DispatchFunction(DispatchAsyncMessages));
-                        dispatchingThread.ExceptionListener += new DispatchingThread.ExceptionHandler(dispatchingThread_ExceptionListener);
-                }
-
-                void dispatchingThread_ExceptionListener(Exception exception)
-                {
-                        connection.OnSessionException(this, exception);
-                }
-
-
-                /// <summary>
-                /// Sets the prefetch size, the maximum number of messages a broker will dispatch to consumers
-                /// until acknowledgements are received.
-                /// </summary>
-                public int PrefetchSize {
-                        get { return prefetchSize; }
-                        set { this.prefetchSize = value; }
-                }
-
-                /// <summary>
-                /// Sets the maximum number of messages to keep around per consumer
-                /// in addition to the prefetch window for non-durable topics until messages
-                /// will start to be evicted for slow consumers.
-                /// Must be > 0 to enable this feature
-                /// </summary>
-                public int MaximumPendingMessageLimit {
-                        get { return maximumPendingMessageLimit; }
-                        set { this.maximumPendingMessageLimit = value; }
-                }
-
-                /// <summary>
-                /// Enables or disables whether asynchronous dispatch should be used by the broker
-                /// </summary>
-                public bool DispatchAsync {
-                        get { return dispatchAsync; }
-                        set { this.dispatchAsync = value; }
-                }
-
-                /// <summary>
-                /// Enables or disables exclusive consumers when using queues. An exclusive consumer means
-                /// only one instance of a consumer is allowed to process messages on a queue to preserve order
-                /// </summary>
-                public bool Exclusive {
-                        get { return exclusive; }
-                        set { this.exclusive = value; }
-                }
-
-                /// <summary>
-                /// Enables or disables retroactive mode for consumers; i.e. do they go back in time or not?
-                /// </summary>
-                public bool Retroactive {
-                        get { return retroactive; }
-                        set { this.retroactive = value; }
-                }
-
-                /// <summary>
-                /// Sets the default consumer priority for consumers
-                /// </summary>
-                public byte Priority {
-                        get { return priority; }
-                        set { this.priority = value; }
-                }
-
- /// <summary>
- /// This property indicates whether or not async send is enabled.
- /// </summary>
- public bool AsyncSend
- {
- get { return asyncSend; }
- set { asyncSend = value; }
- }
-
-                public void Dispose()
-                {
-                        connection.DisposeOf(info.SessionId);
-                }
-
-                public IMessageProducer CreateProducer()
-                {
-                        return CreateProducer(null);
-                }
-
-                public IMessageProducer CreateProducer(IDestination destination)
-                {
-                        ProducerInfo command = CreateProducerInfo(destination);
-                        connection.SyncRequest(command);
-                        return new MessageProducer(this, command);
-                }
-
-
-
-                public IMessageConsumer CreateConsumer(IDestination destination)
-                {
-                        return CreateConsumer(destination, null);
-                }
-
-                public IMessageConsumer CreateConsumer(IDestination destination, string selector)
-                {
-                return CreateConsumer(destination, null, false);
- }
-
-                public IMessageConsumer CreateConsumer(IDestination destination, string selector, bool noLocal)
-                {
-                        ConsumerInfo command = CreateConsumerInfo(destination, selector);
-                        command.NoLocal = noLocal;
-
-                        ConsumerId consumerId = command.ConsumerId;
-
-                        try
-                        {
-                                MessageConsumer consumer = new MessageConsumer(this, command, acknowledgementMode);
-                                // lets register the consumer first in case we start dispatching messages immediately
-                                connection.AddConsumer(consumerId, consumer);
-
-                                connection.SyncRequest(command);
-
-                                consumers[consumerId] = consumer;
-                                return consumer;
-                        }
-                        catch (Exception e)
-                        {
-                                connection.RemoveConsumer(consumerId);
-                                throw e;
-                        }
-                }
-
-                public IMessageConsumer CreateDurableConsumer(
-                        ITopic destination,
-                        string name,
-                        string selector,
-                        bool noLocal)
-                {
-                        ConsumerInfo command = CreateConsumerInfo(destination, selector);
-                        ConsumerId consumerId = command.ConsumerId;
-                        command.SubscriptionName = name;
-                        command.NoLocal = noLocal;
-
-                        try
-                        {
-                                MessageConsumer consumer = new MessageConsumer(this, command, acknowledgementMode);
-                                // lets register the consumer first in case we start dispatching messages immediately
-                                connection.AddConsumer(consumerId, consumer);
-
-                                connection.SyncRequest(command);
-
-                                consumers[consumerId] = consumer;
-                                return consumer;
-                        }
-                        catch (Exception e)
-                        {
-                                connection.RemoveConsumer(consumerId);
-                                throw e;
-                        }
-                }
-
-                public IQueue GetQueue(string name)
-                {
-                        return new ActiveMQQueue(name);
-                }
-
-                public ITopic GetTopic(string name)
-                {
-                        return new ActiveMQTopic(name);
-                }
-
-                public ITemporaryQueue CreateTemporaryQueue()
-                {
-                        ActiveMQTempQueue answer = new ActiveMQTempQueue(connection.CreateTemporaryDestinationName());
-                        CreateTemporaryDestination(answer);
-                        return answer;
-                }
-
-                public ITemporaryTopic CreateTemporaryTopic()
-                {
-                        ActiveMQTempTopic answer = new ActiveMQTempTopic(connection.CreateTemporaryDestinationName());
-                        CreateTemporaryDestination(answer);
-                        return answer;
-                }
-
-                protected void CreateTemporaryDestination(ActiveMQDestination tempDestination)
-                {
-                        DestinationInfo command = new DestinationInfo();
-                        command.ConnectionId = connection.ConnectionId;
-                        command.OperationType = 0; // 0 is add
-                        command.Destination = tempDestination;
-
-                        connection.SyncRequest(command);
-                }
-
-                protected void DestroyTemporaryDestination(ActiveMQDestination tempDestination)
-                {
-                        DestinationInfo command = new DestinationInfo();
-                        command.ConnectionId = connection.ConnectionId;
-                        command.OperationType = 1; // 1 is remove
-                        command.Destination = tempDestination;
-
-                        connection.SyncRequest(command);
-                }
-
-
-                public IMessage CreateMessage()
-                {
-                        ActiveMQMessage answer = new ActiveMQMessage();
-                        Configure(answer);
-                        return answer;
-                }
-
-
-                public ITextMessage CreateTextMessage()
-                {
-                        ActiveMQTextMessage answer = new ActiveMQTextMessage();
-                        Configure(answer);
-                        return answer;
-                }
-
-                public ITextMessage CreateTextMessage(string text)
-                {
-                        ActiveMQTextMessage answer = new ActiveMQTextMessage(text);
-                        Configure(answer);
-                        return answer;
-                }
-
-                public IMapMessage CreateMapMessage()
-                {
-                        return new ActiveMQMapMessage();
-                }
-
-                public IBytesMessage CreateBytesMessage()
-                {
-                        return new ActiveMQBytesMessage();
-                }
-
-                public IBytesMessage CreateBytesMessage(byte[] body)
-                {
-                        ActiveMQBytesMessage answer = new ActiveMQBytesMessage();
-                        answer.Content = body;
-                        return answer;
-                }
-
-                public IObjectMessage CreateObjectMessage(object body)
-                {
-                        ActiveMQObjectMessage answer = new ActiveMQObjectMessage();
-                        answer.Body = body;
-                        return answer;
-                }
-
-                public void Commit()
-                {
-                        if (!Transacted)
-                        {
-                                throw new InvalidOperationException(
-                                        "You cannot perform a Commit() on a non-transacted session. Acknowlegement mode is: "
-                                                + acknowledgementMode);
-                        }
-                        transactionContext.Commit();
-                }
-
-                public void Rollback()
-                {
-                        if (!Transacted)
-                        {
-                                throw new InvalidOperationException(
-                                        "You cannot perform a Commit() on a non-transacted session. Acknowlegement mode is: "
-                                                + acknowledgementMode);
-                        }
-                        transactionContext.Rollback();
-
-                        // lets ensure all the consumers redeliver any rolled back messages
-                        foreach (MessageConsumer consumer in GetConsumers())
-                        {
-                                consumer.RedeliverRolledBackMessages();
-                        }
-                }
-
-
-
-                // Properties
-
-                public Connection Connection {
-                        get { return connection; }
-                }
-
-                public SessionId SessionId {
-                        get { return info.SessionId; }
-                }
-
-                public AcknowledgementMode AcknowledgementMode {
-                        get { return acknowledgementMode; }
-                }
-
-                public bool Transacted {
-                        get { return acknowledgementMode == AcknowledgementMode.Transactional; }
-                }
-
-                public TransactionContext TransactionContext {
-                        get { return transactionContext; }
-                }
-
-                // Implementation methods
- public void DoSend(ActiveMQMessage message)
-                {
- if (AsyncSend)
- {
- connection.OneWay(message);
- }
- else
- {
- connection.SyncRequest(message);
- }
-                }
-
-                public void Close()
-                {
-                        // To do: what about session id?
-                        StopAsyncDelivery();
-                }
-                
-                /// <summary>
-                /// Ensures that a transaction is started
-                /// </summary>
-                public void DoStartTransaction()
-                {
-                        if (Transacted)
-                        {
-                                transactionContext.Begin();
-                        }
-                }
-
-                public void DisposeOf(ConsumerId objectId)
-                {
-                        consumers.Remove(objectId);
-                        connection.RemoveConsumer(objectId);
-                        connection.DisposeOf(objectId);
-                }
-
-                /// <summary>
-                /// Private method called by the dispatcher thread in order to perform
-                /// asynchronous delivery of queued (inbound) messages.
-                /// </summary>
-                private void DispatchAsyncMessages()
-                {
-                        // lets iterate through each consumer created by this session
-                        // ensuring that they have all pending messages dispatched
-                        foreach (MessageConsumer consumer in GetConsumers())
-                        {
-                                consumer.DispatchAsyncMessages();
-                        }
-                }
-
-
-                /// <summary>
-                /// Returns a copy of the current consumers in a thread safe way to avoid concurrency
-                /// problems if the consumers are changed in another thread
-                /// </summary>
-                protected ICollection GetConsumers()
-                {
-                        lock (consumers.SyncRoot)
-                        {
-                                return new ArrayList(consumers.Values);
-                        }
-                }
-
-                protected virtual ConsumerInfo CreateConsumerInfo(IDestination destination, string selector)
-                {
-                        ConsumerInfo answer = new ConsumerInfo();
-                        ConsumerId id = new ConsumerId();
-                        id.ConnectionId = info.SessionId.ConnectionId;
-                        id.SessionId = info.SessionId.Value;
-                        lock (this)
-                        {
-                                id.Value = ++consumerCounter;
-                        }
-                        answer.ConsumerId = id;
-                        answer.Destination = ActiveMQDestination.Transform(destination);
-                        answer.Selector = selector;
-                        answer.PrefetchSize = prefetchSize;
-                        answer.Priority = priority;
-                        answer.Exclusive = exclusive;
-                        answer.DispatchAsync = dispatchAsync;
-                        answer.Retroactive = retroactive;
-
-                        // If the destination contained a URI query, then use it to set public properties
-                        // on the ConsumerInfo
-                        ActiveMQDestination amqDestination = destination as ActiveMQDestination;
-                        if (amqDestination != null && amqDestination.Options != null)
-                        {
-                                Util.URISupport.SetProperties(answer, amqDestination.Options, "consumer.");
-                        }
-
-                        return answer;
-                }
-
-                protected virtual ProducerInfo CreateProducerInfo(IDestination destination)
-                {
-                        ProducerInfo answer = new ProducerInfo();
-                        ProducerId id = new ProducerId();
-                        id.ConnectionId = info.SessionId.ConnectionId;
-                        id.SessionId = info.SessionId.Value;
-                        lock (this)
-                        {
-                                id.Value = ++producerCounter;
-                        }
-                        answer.ProducerId = id;
-                        answer.Destination = ActiveMQDestination.Transform(destination);
-
-                        answer.Destination = ActiveMQDestination.Transform(destination);
-
-                        // If the destination contained a URI query, then use it to set public
-                        // properties on the ProducerInfo
-                        ActiveMQDestination amqDestination = destination as ActiveMQDestination;
-                        if (amqDestination != null && amqDestination.Options != null)
-                        {
-                                Util.URISupport.SetProperties(answer, amqDestination.Options, "producer.");
-                        }
-
-                        return answer;
-                }
-
-                /// <summary>
-                /// Configures the message command
-                /// </summary>
-                protected void Configure(ActiveMQMessage message)
-                {
-                }
-
- internal void StopAsyncDelivery()
- {
- dispatchingThread.Stop();
- }
-
- internal void StartAsyncDelivery(Dispatcher dispatcher)
- {
- if(dispatcher != null)
- dispatcher.SetAsyncDelivery(dispatchingThread.EventHandle);
- dispatchingThread.Start();
- }
-        }
-}
+namespace Apache.ActiveMQ
+{
+ /// <summary>
+ /// Default provider of ISession
+ /// </summary>
+ public class Session : ISession
+ {
+ private AcknowledgementMode acknowledgementMode;
+ private bool asyncSend;
+ private Connection connection;
+ private long consumerCounter;
+ private IDictionary consumers = Hashtable.Synchronized(new Hashtable());
+ private bool dispatchAsync;
+ private DispatchingThread dispatchingThread;
+ private bool exclusive;
+ private SessionInfo info;
+ private int maximumPendingMessageLimit;
+ private int prefetchSize = 1000;
+ private byte priority;
+ private long producerCounter;
+ private bool retroactive;
+ private TransactionContext transactionContext;
+ private bool disposed = false;
+
+ public Session(Connection connection, SessionInfo info, AcknowledgementMode acknowledgementMode)
+ {
+ this.connection = connection;
+ this.info = info;
+ this.acknowledgementMode = acknowledgementMode;
+ this.asyncSend = connection.AsyncSend;
+ transactionContext = new TransactionContext(this);
+ dispatchingThread =
+ new DispatchingThread(new DispatchingThread.DispatchFunction(DispatchAsyncMessages));
+ dispatchingThread.ExceptionListener +=
+ new DispatchingThread.ExceptionHandler(dispatchingThread_ExceptionListener);
+ }
+
+ ~Session()
+ {
+ Dispose(false);
+ }
+
+ /// <summary>
+ /// Sets the prefetch size, the maximum number of messages a broker will dispatch to consumers
+ /// until acknowledgements are received.
+ /// </summary>
+ public int PrefetchSize
+ {
+ get { return prefetchSize; }
+ set { this.prefetchSize = value; }
+ }
+
+ /// <summary>
+ /// Sets the maximum number of messages to keep around per consumer
+ /// in addition to the prefetch window for non-durable topics until messages
+ /// will start to be evicted for slow consumers.
+ /// Must be > 0 to enable this feature
+ /// </summary>
+ public int MaximumPendingMessageLimit
+ {
+ get { return maximumPendingMessageLimit; }
+ set { this.maximumPendingMessageLimit = value; }
+ }
+
+ /// <summary>
+ /// Enables or disables whether asynchronous dispatch should be used by the broker
+ /// </summary>
+ public bool DispatchAsync
+ {
+ get { return dispatchAsync; }
+ set { this.dispatchAsync = value; }
+ }
+
+ /// <summary>
+ /// Enables or disables exclusive consumers when using queues. An exclusive consumer means
+ /// only one instance of a consumer is allowed to process messages on a queue to preserve order
+ /// </summary>
+ public bool Exclusive
+ {
+ get { return exclusive; }
+ set { this.exclusive = value; }
+ }
+
+ /// <summary>
+ /// Enables or disables retroactive mode for consumers; i.e. do they go back in time or not?
+ /// </summary>
+ public bool Retroactive
+ {
+ get { return retroactive; }
+ set { this.retroactive = value; }
+ }
+
+ /// <summary>
+ /// Sets the default consumer priority for consumers
+ /// </summary>
+ public byte Priority
+ {
+ get { return priority; }
+ set { this.priority = value; }
+ }
+
+ /// <summary>
+ /// This property indicates whether or not async send is enabled.
+ /// </summary>
+ public bool AsyncSend
+ {
+ get { return asyncSend; }
+ set { asyncSend = value; }
+ }
+
+ public Connection Connection
+ {
+ get { return connection; }
+ }
+
+ public SessionId SessionId
+ {
+ get { return info.SessionId; }
+ }
+
+ public TransactionContext TransactionContext
+ {
+ get { return transactionContext; }
+ }
+
+ #region ISession Members
+
+ public void Dispose()
+ {
+ Dispose(true);
+ GC.SuppressFinalize(this);
+ }
+
+ protected void Dispose(bool disposing)
+ {
+ if(disposed)
+ {
+ return;
+ }
+
+ if(disposing)
+ {
+ // Dispose managed code here.
+ }
+
+ try
+ {
+ connection.DisposeOf(info.SessionId);
+ }
+ catch
+ {
+ // Ignore network errors.
+ }
+
+ disposed = true;
+ }
+
+ public IMessageProducer CreateProducer()
+ {
+ return CreateProducer(null);
+ }
+
+ public IMessageProducer CreateProducer(IDestination destination)
+ {
+ ProducerInfo command = CreateProducerInfo(destination);
+ connection.SyncRequest(command);
+ return new MessageProducer(this, command);
+ }
+
+
+ public IMessageConsumer CreateConsumer(IDestination destination)
+ {
+ return CreateConsumer(destination, null);
+ }
+
+ public IMessageConsumer CreateConsumer(IDestination destination, string selector)
+ {
+ return CreateConsumer(destination, selector, false);
+ }
+
+ public IMessageConsumer CreateConsumer(IDestination destination, string selector, bool noLocal)
+ {
+ ConsumerInfo command = CreateConsumerInfo(destination, selector);
+ command.NoLocal = noLocal;
+ command.AcknowledgementMode = acknowledgementMode;
+
+ ConsumerId consumerId = command.ConsumerId;
+
+ try
+ {
+ MessageConsumer consumer = new MessageConsumer(this, command, acknowledgementMode);
+ // lets register the consumer first in case we start dispatching messages immediately
+ connection.AddConsumer(consumerId, consumer);
+
+ connection.SyncRequest(command);
+
+ consumers[consumerId] = consumer;
+ return consumer;
+ }
+ catch(Exception e)
+ {
+ connection.RemoveConsumer(consumerId);
+ throw e;
+ }
+ }
+
+ public IMessageConsumer CreateDurableConsumer(
+ ITopic destination,
+ string name,
+ string selector,
+ bool noLocal)
+ {
+ ConsumerInfo command = CreateConsumerInfo(destination, selector);
+ ConsumerId consumerId = command.ConsumerId;
+ command.SubscriptionName = name;
+ command.NoLocal = noLocal;
+
+ try
+ {
+ MessageConsumer consumer = new MessageConsumer(this, command, acknowledgementMode);
+ // lets register the consumer first in case we start dispatching messages immediately
+ connection.AddConsumer(consumerId, consumer);
+
+ connection.SyncRequest(command);
+
+ consumers[consumerId] = consumer;
+ return consumer;
+ }
+ catch(Exception e)
+ {
+ connection.RemoveConsumer(consumerId);
+ throw e;
+ }
+ }
+
+ public IQueue GetQueue(string name)
+ {
+ return new ActiveMQQueue(name);
+ }
+
+ public ITopic GetTopic(string name)
+ {
+ return new ActiveMQTopic(name);
+ }
+
+ public ITemporaryQueue CreateTemporaryQueue()
+ {
+ ActiveMQTempQueue answer = new ActiveMQTempQueue(connection.CreateTemporaryDestinationName());
+ CreateTemporaryDestination(answer);
+ return answer;
+ }
+
+ public ITemporaryTopic CreateTemporaryTopic()
+ {
+ ActiveMQTempTopic answer = new ActiveMQTempTopic(connection.CreateTemporaryDestinationName());
+ CreateTemporaryDestination(answer);
+ return answer;
+ }
+
+
+ public IMessage CreateMessage()
+ {
+ ActiveMQMessage answer = new ActiveMQMessage();
+ Configure(answer);
+ return answer;
+ }
+
+
+ public ITextMessage CreateTextMessage()
+ {
+ ActiveMQTextMessage answer = new ActiveMQTextMessage();
+ Configure(answer);
+ return answer;
+ }
+
+ public ITextMessage CreateTextMessage(string text)
+ {
+ ActiveMQTextMessage answer = new ActiveMQTextMessage(text);
+ Configure(answer);
+ return answer;
+ }
+
+ public IMapMessage CreateMapMessage()
+ {
+ return new ActiveMQMapMessage();
+ }
+
+ public IBytesMessage CreateBytesMessage()
+ {
+ return new ActiveMQBytesMessage();
+ }
+
+ public IBytesMessage CreateBytesMessage(byte[] body)
+ {
+ ActiveMQBytesMessage answer = new ActiveMQBytesMessage();
+ answer.Content = body;
+ return answer;
+ }
+
+ public IObjectMessage CreateObjectMessage(object body)
+ {
+ ActiveMQObjectMessage answer = new ActiveMQObjectMessage();
+ answer.Body = body;
+ return answer;
+ }
+
+ public void Commit()
+ {
+ if(!Transacted)
+ {
+ throw new InvalidOperationException(
+ "You cannot perform a Commit() on a non-transacted session. Acknowlegement mode is: "
+ + acknowledgementMode);
+ }
+ transactionContext.Commit();
+ }
+
+ public void Rollback()
+ {
+ if(!Transacted)
+ {
+ throw new InvalidOperationException(
+ "You cannot perform a Commit() on a non-transacted session. Acknowlegement mode is: "
+ + acknowledgementMode);
+ }
+ transactionContext.Rollback();
+
+ // lets ensure all the consumers redeliver any rolled back messages
+ foreach(MessageConsumer consumer in GetConsumers())
+ {
+ consumer.RedeliverRolledBackMessages();
+ }
+ }
+
+
+ // Properties
+
+ public AcknowledgementMode AcknowledgementMode
+ {
+ get { return acknowledgementMode; }
+ }
+
+ public bool Transacted
+ {
+ get { return acknowledgementMode == AcknowledgementMode.Transactional; }
+ }
+
+ public void Close()
+ {
+ // To do: what about session id?
+ StopAsyncDelivery();
+ }
+
+ #endregion
+
+ private void dispatchingThread_ExceptionListener(Exception exception)
+ {
+ connection.OnSessionException(this, exception);
+ }
+
+ protected void CreateTemporaryDestination(ActiveMQDestination tempDestination)
+ {
+ DestinationInfo command = new DestinationInfo();
+ command.ConnectionId = connection.ConnectionId;
+ command.OperationType = 0; // 0 is add
+ command.Destination = tempDestination;
+
+ connection.SyncRequest(command);
+ }
+
+ protected void DestroyTemporaryDestination(ActiveMQDestination tempDestination)
+ {
+ DestinationInfo command = new DestinationInfo();
+ command.ConnectionId = connection.ConnectionId;
+ command.OperationType = 1; // 1 is remove
+ command.Destination = tempDestination;
+
+ connection.SyncRequest(command);
+ }
+
+ public void DoSend(ActiveMQMessage message)
+ {
+ if(AsyncSend)
+ {
+ connection.OneWay(message);
+ }
+ else
+ {
+ connection.SyncRequest(message);
+ }
+ }
+
+ /// <summary>
+ /// Ensures that a transaction is started
+ /// </summary>
+ public void DoStartTransaction()
+ {
+ if(Transacted)
+ {
+ transactionContext.Begin();
+ }
+ }
+
+ public void DisposeOf(ConsumerId objectId)
+ {
+ consumers.Remove(objectId);
+ connection.RemoveConsumer(objectId);
+ connection.DisposeOf(objectId);
+ }
+
+ /// <summary>
+ /// Private method called by the dispatcher thread in order to perform
+ /// asynchronous delivery of queued (inbound) messages.
+ /// </summary>
+ private void DispatchAsyncMessages()
+ {
+ // lets iterate through each consumer created by this session
+ // ensuring that they have all pending messages dispatched
+ foreach(MessageConsumer consumer in GetConsumers())
+ {
+ consumer.DispatchAsyncMessages();
+ }
+ }
+
+
+ /// <summary>
+ /// Returns a copy of the current consumers in a thread safe way to avoid concurrency
+ /// problems if the consumers are changed in another thread
+ /// </summary>
+ protected ICollection GetConsumers()
+ {
+ lock(consumers.SyncRoot)
+ {
+ return new ArrayList(consumers.Values);
+ }
+ }
+
+ protected virtual ConsumerInfo CreateConsumerInfo(IDestination destination, string selector)
+ {
+ ConsumerInfo answer = new ConsumerInfo();
+ ConsumerId id = new ConsumerId();
+ id.ConnectionId = info.SessionId.ConnectionId;
+ id.SessionId = info.SessionId.Value;
+ lock(this)
+ {
+ id.Value = ++consumerCounter;
+ }
+ answer.ConsumerId = id;
+ answer.Destination = ActiveMQDestination.Transform(destination);
+ answer.Selector = selector;
+ answer.PrefetchSize = prefetchSize;
+ answer.Priority = priority;
+ answer.Exclusive = exclusive;
+ answer.DispatchAsync = dispatchAsync;
+ answer.Retroactive = retroactive;
+
+ // If the destination contained a URI query, then use it to set public properties
+ // on the ConsumerInfo
+ ActiveMQDestination amqDestination = destination as ActiveMQDestination;
+ if(amqDestination != null && amqDestination.Options != null)
+ {
+ Util.URISupport.SetProperties(answer, amqDestination.Options, "consumer.");
+ }
+
+ return answer;
+ }
+
+ protected virtual ProducerInfo CreateProducerInfo(IDestination destination)
+ {
+ ProducerInfo answer = new ProducerInfo();
+ ProducerId id = new ProducerId();
+ id.ConnectionId = info.SessionId.ConnectionId;
+ id.SessionId = info.SessionId.Value;
+ lock(this)
+ {
+ id.Value = ++producerCounter;
+ }
+ answer.ProducerId = id;
+ answer.Destination = ActiveMQDestination.Transform(destination);
+
+ // If the destination contained a URI query, then use it to set public
+ // properties on the ProducerInfo
+ ActiveMQDestination amqDestination = destination as ActiveMQDestination;
+ if(amqDestination != null && amqDestination.Options != null)
+ {
+ Util.URISupport.SetProperties(answer, amqDestination.Options, "producer.");
+ }
+
+ return answer;
+ }
+
+ /// <summary>
+ /// Configures the message command
+ /// </summary>
+ protected void Configure(ActiveMQMessage message)
+ {
+ }
+
+ internal void StopAsyncDelivery()
+ {
+ dispatchingThread.Stop();
+ }
+
+ internal void StartAsyncDelivery(Dispatcher dispatcher)
+ {
+ if(dispatcher != null)
+ {
+ dispatcher.SetAsyncDelivery(dispatchingThread.EventHandle);
+ }
+ dispatchingThread.Start();
+ }
+ }
+}
\ No newline at end of file

Modified: activemq/activemq-dotnet/trunk/src/main/csharp/ActiveMQ/TransactionContext.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/trunk/src/main/csharp/ActiveMQ/TransactionContext.cs?rev=589629&r1=589628&r2=589629&view=diff
==============================================================================
--- activemq/activemq-dotnet/trunk/src/main/csharp/ActiveMQ/TransactionContext.cs (original)
+++ activemq/activemq-dotnet/trunk/src/main/csharp/ActiveMQ/TransactionContext.cs Mon Oct 29 06:55:09 2007
@@ -14,12 +14,12 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-using ActiveMQ;
-using ActiveMQ.Commands;
+using Apache.ActiveMQ;
+using Apache.ActiveMQ.Commands;
 using System.Collections;
 
 
-namespace ActiveMQ
+namespace Apache.ActiveMQ
 {
  public enum TransactionType
     {
@@ -27,7 +27,7 @@
     }
 }
 
-namespace ActiveMQ
+namespace Apache.ActiveMQ
 {
  public class TransactionContext
     {

Modified: activemq/activemq-dotnet/trunk/src/main/csharp/ActiveMQ/Transport/FutureResponse.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/trunk/src/main/csharp/ActiveMQ/Transport/FutureResponse.cs?rev=589629&r1=589628&r2=589629&view=diff
==============================================================================
--- activemq/activemq-dotnet/trunk/src/main/csharp/ActiveMQ/Transport/FutureResponse.cs (original)
+++ activemq/activemq-dotnet/trunk/src/main/csharp/ActiveMQ/Transport/FutureResponse.cs Mon Oct 29 06:55:09 2007
@@ -14,12 +14,13 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-using ActiveMQ.Commands;
+using Apache.ActiveMQ.Commands;
 using System;
 using System.Threading;
-using ActiveMQ.Util;
+using Apache.ActiveMQ.Util;
+using Apache.NMS;
 
-namespace ActiveMQ.Transport
+namespace Apache.ActiveMQ.Transport
 {
 
  /// <summary>

Modified: activemq/activemq-dotnet/trunk/src/main/csharp/ActiveMQ/Transport/ITransport.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/trunk/src/main/csharp/ActiveMQ/Transport/ITransport.cs?rev=589629&r1=589628&r2=589629&view=diff
==============================================================================
--- activemq/activemq-dotnet/trunk/src/main/csharp/ActiveMQ/Transport/ITransport.cs (original)
+++ activemq/activemq-dotnet/trunk/src/main/csharp/ActiveMQ/Transport/ITransport.cs Mon Oct 29 06:55:09 2007
@@ -14,11 +14,11 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-using ActiveMQ.Commands;
-using NMS;
+using Apache.ActiveMQ.Commands;
+using Apache.NMS;
 using System;
 
-namespace ActiveMQ.Transport
+namespace Apache.ActiveMQ.Transport
 {
  public delegate void CommandHandler(ITransport sender, Command command);
  public delegate void ExceptionHandler(ITransport sender, Exception command);

Modified: activemq/activemq-dotnet/trunk/src/main/csharp/ActiveMQ/Transport/ITransportFactory.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/trunk/src/main/csharp/ActiveMQ/Transport/ITransportFactory.cs?rev=589629&r1=589628&r2=589629&view=diff
==============================================================================
--- activemq/activemq-dotnet/trunk/src/main/csharp/ActiveMQ/Transport/ITransportFactory.cs (original)
+++ activemq/activemq-dotnet/trunk/src/main/csharp/ActiveMQ/Transport/ITransportFactory.cs Mon Oct 29 06:55:09 2007
@@ -17,7 +17,7 @@
 
 using System;
 
-namespace ActiveMQ.Transport
+namespace Apache.ActiveMQ.Transport
 {
  public interface ITransportFactory
     {

Modified: activemq/activemq-dotnet/trunk/src/main/csharp/ActiveMQ/Transport/IWireFormat.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/trunk/src/main/csharp/ActiveMQ/Transport/IWireFormat.cs?rev=589629&r1=589628&r2=589629&view=diff
==============================================================================
--- activemq/activemq-dotnet/trunk/src/main/csharp/ActiveMQ/Transport/IWireFormat.cs (original)
+++ activemq/activemq-dotnet/trunk/src/main/csharp/ActiveMQ/Transport/IWireFormat.cs Mon Oct 29 06:55:09 2007
@@ -17,7 +17,7 @@
 using System;
 using System.IO;
 
-namespace ActiveMQ.Transport
+namespace Apache.ActiveMQ.Transport
 {
  /// <summary>
  /// Represents the marshalling of commands to and from an IO stream

Modified: activemq/activemq-dotnet/trunk/src/main/csharp/ActiveMQ/Transport/LoggingTransport.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/trunk/src/main/csharp/ActiveMQ/Transport/LoggingTransport.cs?rev=589629&r1=589628&r2=589629&view=diff
==============================================================================
--- activemq/activemq-dotnet/trunk/src/main/csharp/ActiveMQ/Transport/LoggingTransport.cs (original)
+++ activemq/activemq-dotnet/trunk/src/main/csharp/ActiveMQ/Transport/LoggingTransport.cs Mon Oct 29 06:55:09 2007
@@ -14,11 +14,12 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-using ActiveMQ.Commands;
-using ActiveMQ.Transport;
+using Apache.NMS;
+using Apache.ActiveMQ.Commands;
+using Apache.ActiveMQ.Transport;
 using System;
 
-namespace ActiveMQ.Transport
+namespace Apache.ActiveMQ.Transport
 {
 
  /// <summary>

Modified: activemq/activemq-dotnet/trunk/src/main/csharp/ActiveMQ/Transport/MutexTransport.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/trunk/src/main/csharp/ActiveMQ/Transport/MutexTransport.cs?rev=589629&r1=589628&r2=589629&view=diff
==============================================================================
--- activemq/activemq-dotnet/trunk/src/main/csharp/ActiveMQ/Transport/MutexTransport.cs (original)
+++ activemq/activemq-dotnet/trunk/src/main/csharp/ActiveMQ/Transport/MutexTransport.cs Mon Oct 29 06:55:09 2007
@@ -14,11 +14,11 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-using ActiveMQ.Commands;
-using ActiveMQ.Transport;
+using Apache.ActiveMQ.Commands;
+using Apache.ActiveMQ.Transport;
 using System;
 
-namespace ActiveMQ.Transport
+namespace Apache.ActiveMQ.Transport
 {
 
     /// <summary>

Modified: activemq/activemq-dotnet/trunk/src/main/csharp/ActiveMQ/Transport/ResponseCorrelator.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/trunk/src/main/csharp/ActiveMQ/Transport/ResponseCorrelator.cs?rev=589629&r1=589628&r2=589629&view=diff
==============================================================================
--- activemq/activemq-dotnet/trunk/src/main/csharp/ActiveMQ/Transport/ResponseCorrelator.cs (original)
+++ activemq/activemq-dotnet/trunk/src/main/csharp/ActiveMQ/Transport/ResponseCorrelator.cs Mon Oct 29 06:55:09 2007
@@ -18,10 +18,11 @@
 using System;
 using System.Collections;
 
-using ActiveMQ.Commands;
-using ActiveMQ.Transport;
+using Apache.ActiveMQ.Commands;
+using Apache.ActiveMQ.Transport;
+using Apache.NMS;
 
-namespace ActiveMQ.Transport
+namespace Apache.ActiveMQ.Transport
 {
 
     /// <summary>

Modified: activemq/activemq-dotnet/trunk/src/main/csharp/ActiveMQ/Transport/Stomp/StompFrameStream.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/trunk/src/main/csharp/ActiveMQ/Transport/Stomp/StompFrameStream.cs?rev=589629&r1=589628&r2=589629&view=diff
==============================================================================
--- activemq/activemq-dotnet/trunk/src/main/csharp/ActiveMQ/Transport/Stomp/StompFrameStream.cs (original)
+++ activemq/activemq-dotnet/trunk/src/main/csharp/ActiveMQ/Transport/Stomp/StompFrameStream.cs Mon Oct 29 06:55:09 2007
@@ -15,16 +15,16 @@
  * limitations under the License.
  */
 using System.Reflection;
-using ActiveMQ.Commands;
-using ActiveMQ.OpenWire.V1;
-using ActiveMQ.Transport;
-using NMS;
+using Apache.ActiveMQ.Commands;
+using Apache.ActiveMQ.OpenWire.V1;
+using Apache.ActiveMQ.Transport;
+using Apache.NMS;
 using System;
 using System.Collections;
 using System.IO;
 using System.Text;
 
-namespace ActiveMQ.Transport.Stomp
+namespace Apache.ActiveMQ.Transport.Stomp
 {
     /// <summary>
     /// A Stream for writing a <a href="http://stomp.codehaus.org/">STOMP</a> Frame
@@ -103,14 +103,9 @@
  {
  ds.Write(content);
  }
-
- // if no content length then lets write a null
- if (contentLength < 0)
- {
- ds.Write(NULL);
- }
- }
 
-
+ // Always write a terminating NULL byte to end the content frame.
+ ds.Write(NULL);
+ }
     }
 }

Modified: activemq/activemq-dotnet/trunk/src/main/csharp/ActiveMQ/Transport/Stomp/StompHelper.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/trunk/src/main/csharp/ActiveMQ/Transport/Stomp/StompHelper.cs?rev=589629&r1=589628&r2=589629&view=diff
==============================================================================
--- activemq/activemq-dotnet/trunk/src/main/csharp/ActiveMQ/Transport/Stomp/StompHelper.cs (original)
+++ activemq/activemq-dotnet/trunk/src/main/csharp/ActiveMQ/Transport/Stomp/StompHelper.cs Mon Oct 29 06:55:09 2007
@@ -1,194 +1,261 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-using System.Reflection;
-using ActiveMQ.Commands;
-using ActiveMQ.OpenWire.V1;
-using ActiveMQ.Transport;
-using NMS;
-using System;
-using System.Collections;
-using System.IO;
-using System.Text;
-
-namespace ActiveMQ.Transport.Stomp
-{
-    /// <summary>
-    /// Some <a href="http://stomp.codehaus.org/">STOMP</a> protocol conversion helper methods.
-    /// </summary>
-    public class StompHelper
-    {
-
-
- public static ActiveMQDestination ToDestination(string text)
- {
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+using System;
+using System.Text;
+using Apache.ActiveMQ.Commands;
+using Apache.NMS;
+
+namespace Apache.ActiveMQ.Transport.Stomp
+{
+    /// <summary>
+    /// Some <a href="http://stomp.codehaus.org/">STOMP</a> protocol conversion helper methods.
+    /// </summary>
+    public class StompHelper
+    {
+
+
+ public static ActiveMQDestination ToDestination(string text)
+ {
     if( text == null )
     {
-                return null;
-    }    
- int type = ActiveMQDestination.ACTIVEMQ_QUEUE;
- if (text.StartsWith("/queue/"))
- {
- text = text.Substring("/queue/".Length);
- }
- else if (text.StartsWith("/topic/"))
- {
- text = text.Substring("/topic/".Length);
- type = ActiveMQDestination.ACTIVEMQ_TOPIC;
- }
- else if (text.StartsWith("/temp-topic/"))
- {
- text = text.Substring("/temp-topic/".Length);
- type = ActiveMQDestination.ACTIVEMQ_TEMPORARY_TOPIC;
- }
- else if (text.StartsWith("/temp-queue/"))
- {
- text = text.Substring("/temp-queue/".Length);
- type = ActiveMQDestination.ACTIVEMQ_TEMPORARY_QUEUE;
- }
- return ActiveMQDestination.CreateDestination(type, text);
- }
-
- public static string ToStomp(ActiveMQDestination destination)
- {
- if (destination == null)
- {
- return null;
- }
- else
- {
- switch (destination.DestinationType)
- {
- case DestinationType.Topic:
- return "/topic/" + destination.PhysicalName;
-
- case DestinationType.TemporaryTopic:
- return "/temp-topic/" + destination.PhysicalName;
-
- case DestinationType.TemporaryQueue:
- return "/temp-queue/" + destination.PhysicalName;
-
- default:
- return "/queue/" + destination.PhysicalName;
- }
- }
- }
-
- public static string ToStomp(ConsumerId id)
- {
- return id.ConnectionId + ":" + id.SessionId + ":" + id.Value;
- }
-
- public static ConsumerId ToConsumerId(string text)
- {
- if (text == null)
- {
- return null;
- }
- ConsumerId answer = new ConsumerId();
- int idx = text.LastIndexOf(':');
- if (idx >= 0) {
- answer.Value = Int32.Parse(text.Substring(idx + 1));
- text = text.Substring(0, idx);
- idx = text.LastIndexOf(':');
- if (idx >= 0) {
- answer.SessionId = Int32.Parse(text.Substring(idx + 1));
- text = text.Substring(0, idx);
- }
- }
- answer.ConnectionId = text;
- return answer;
- }
-
- public static string ToStomp(ProducerId id)
- {
- return id.ConnectionId + ":" + id.SessionId + ":" + id.Value;
- }
-
- public static ProducerId ToProducerId(string text)
- {
- if (text == null)
- {
- return null;
- }
- ProducerId answer = new ProducerId();
- int idx = text.LastIndexOf(':');
- if (idx >= 0) {
- answer.Value = Int32.Parse(text.Substring(idx + 1));
- text = text.Substring(0, idx);
- idx = text.LastIndexOf(':');
- if (idx >= 0) {
- answer.SessionId = Int32.Parse(text.Substring(idx + 1));
- text = text.Substring(0, idx);
- }
- }
- answer.ConnectionId = text;
- return answer;
- }
-
- public static string ToStomp(MessageId id)
- {
- return ToStomp(id.ProducerId) + ":" + id.BrokerSequenceId + ":" + id.ProducerSequenceId;
- }
-
- public static MessageId ToMessageId(string text)
- {
- if (text == null)
- {
- return null;
- }
- MessageId answer = new MessageId();
- int idx = text.LastIndexOf(':');
- if (idx >= 0) {
- answer.ProducerSequenceId = Int32.Parse(text.Substring(idx + 1));
- text = text.Substring(0, idx);
- idx = text.LastIndexOf(':');
- if (idx >= 0) {
- answer.BrokerSequenceId = Int32.Parse(text.Substring(idx + 1));
- text = text.Substring(0, idx);
- }
- }
- answer.ProducerId = ToProducerId(text);
- return answer;
- }
-
- public static string ToStomp(TransactionId id)
- {
- if (id is LocalTransactionId)
- {
- return ToStomp(id as LocalTransactionId);
- }
- return id.ToString();
- }
-
- public static string ToStomp(LocalTransactionId transactionId)
- {
- return transactionId.ConnectionId.Value + ":" + transactionId.Value;
- }
-
- public static bool ToBool(string text, bool defaultValue)
- {
- if (text == null)
- {
- return defaultValue;
- }
- else
- {
- return "true" == text || "TRUE" == text;
- }
- }
-    }
-}
+                return null;
+    }    
+ int type = ActiveMQDestination.ACTIVEMQ_QUEUE;
+ if (text.StartsWith("/queue/"))
+ {
+ text = text.Substring("/queue/".Length);
+ }
+ else if (text.StartsWith("/topic/"))
+ {
+ text = text.Substring("/topic/".Length);
+ type = ActiveMQDestination.ACTIVEMQ_TOPIC;
+ }
+ else if (text.StartsWith("/temp-topic/"))
+ {
+ text = text.Substring("/temp-topic/".Length);
+ type = ActiveMQDestination.ACTIVEMQ_TEMPORARY_TOPIC;
+ }
+ else if (text.StartsWith("/temp-queue/"))
+ {
+ text = text.Substring("/temp-queue/".Length);
+ type = ActiveMQDestination.ACTIVEMQ_TEMPORARY_QUEUE;
+ }
+ return ActiveMQDestination.CreateDestination(type, text);
+ }
+
+ public static string ToStomp(ActiveMQDestination destination)
+ {
+ if (destination == null)
+ {
+ return null;
+ }
+ else
+ {
+ switch (destination.DestinationType)
+ {
+ case DestinationType.Topic:
+ return "/topic/" + destination.PhysicalName;
+
+ case DestinationType.TemporaryTopic:
+ return "/temp-topic/" + destination.PhysicalName;
+
+ case DestinationType.TemporaryQueue:
+ return "/temp-queue/" + destination.PhysicalName;
+
+ default:
+ return "/queue/" + destination.PhysicalName;
+ }
+ }
+ }
+
+ public static string ToStomp(ConsumerId id)
+ {
+ return id.ConnectionId + ":" + id.SessionId + ":" + id.Value;
+ }
+
+ public static ConsumerId ToConsumerId(string text)
+ {
+ if (text == null)
+ {
+ return null;
+ }
+ ConsumerId answer = new ConsumerId();
+ int idx = text.LastIndexOf(':');
+ if (idx >= 0) {
+ try
+ {
+ answer.Value = Int32.Parse(text.Substring(idx + 1));
+ text = text.Substring(0, idx);
+ idx = text.LastIndexOf(':');
+ if (idx >= 0) {
+ try
+ {
+ answer.SessionId = Int32.Parse(text.Substring(idx + 1));
+ text = text.Substring(0, idx);
+ }
+ catch(Exception ex)
+ {
+ Tracer.Debug(ex.Message);
+ }
+ }
+ }
+ catch(Exception ex)
+ {
+ Tracer.Debug(ex.Message);
+ }
+ }
+ answer.ConnectionId = text;
+ return answer;
+ }
+
+ public static string ToStomp(ProducerId id)
+ {
+ StringBuilder producerBuilder = new StringBuilder();
+
+ producerBuilder.Append(id.ConnectionId);
+ if(0 != id.SessionId)
+ {
+ producerBuilder.Append(":");
+ producerBuilder.Append(id.SessionId);
+ }
+
+ if(0 != id.Value)
+ {
+ producerBuilder.Append(":");
+ producerBuilder.Append(id.Value);
+ }
+
+ return producerBuilder.ToString();
+ }
+
+ public static ProducerId ToProducerId(string text)
+ {
+ if (text == null)
+ {
+ return null;
+ }
+ ProducerId answer = new ProducerId();
+ int idx = text.LastIndexOf(':');
+ if (idx >= 0) {
+ try
+ {
+ answer.Value = Int32.Parse(text.Substring(idx + 1));
+ text = text.Substring(0, idx);
+ idx = text.LastIndexOf(':');
+ if (idx >= 0) {
+ try
+ {
+ answer.SessionId = Int32.Parse(text.Substring(idx + 1));
+ text = text.Substring(0, idx);
+ }
+ catch(Exception ex)
+ {
+ Tracer.Debug(ex.Message);
+ }
+ }
+ }
+ catch(Exception ex)
+ {
+ Tracer.Debug(ex.Message);
+ }
+ }
+ answer.ConnectionId = text;
+ return answer;
+ }
+
+ public static string ToStomp(MessageId id)
+ {
+ StringBuilder messageBuilder = new StringBuilder();
+
+ messageBuilder.Append(ToStomp(id.ProducerId));
+ if(0 != id.BrokerSequenceId)
+ {
+ messageBuilder.Append(":");
+ messageBuilder.Append(id.BrokerSequenceId);
+ }
+
+ if(0 != id.ProducerSequenceId)
+ {
+ messageBuilder.Append(":");
+ messageBuilder.Append(id.ProducerSequenceId);
+ }
+
+ return messageBuilder.ToString();
+ }
+
+ public static MessageId ToMessageId(string text)
+ {
+ if (text == null)
+ {
+ return null;
+ }
+ MessageId answer = new MessageId();
+ int idx = text.LastIndexOf(':');
+ if (idx >= 0) {
+ try
+ {
+ answer.ProducerSequenceId = Int32.Parse(text.Substring(idx + 1));
+ text = text.Substring(0, idx);
+ idx = text.LastIndexOf(':');
+ if (idx >= 0) {
+ try
+ {
+ answer.BrokerSequenceId = Int32.Parse(text.Substring(idx + 1));
+ text = text.Substring(0, idx);
+ }
+ catch(Exception ex)
+ {
+ Tracer.Debug(ex.Message);
+ }
+ }
+ }
+ catch(Exception ex)
+ {
+ Tracer.Debug(ex.Message);
+ }
+ }
+ answer.ProducerId = ToProducerId(text);
+ return answer;
+ }
+
+ public static string ToStomp(TransactionId id)
+ {
+ if (id is LocalTransactionId)
+ {
+ return ToStomp(id as LocalTransactionId);
+ }
+ return id.ToString();
+ }
+
+ public static string ToStomp(LocalTransactionId transactionId)
+ {
+ return transactionId.ConnectionId.Value + ":" + transactionId.Value;
+ }
+
+ public static bool ToBool(string text, bool defaultValue)
+ {
+ if (text == null)
+ {
+ return defaultValue;
+ }
+ else
+ {
+ return "true" == text || "TRUE" == text;
+ }
+ }
+    }
+}

Modified: activemq/activemq-dotnet/trunk/src/main/csharp/ActiveMQ/Transport/Stomp/StompWireFormat.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/trunk/src/main/csharp/ActiveMQ/Transport/Stomp/StompWireFormat.cs?rev=589629&r1=589628&r2=589629&view=diff
==============================================================================
--- activemq/activemq-dotnet/trunk/src/main/csharp/ActiveMQ/Transport/Stomp/StompWireFormat.cs (original)
+++ activemq/activemq-dotnet/trunk/src/main/csharp/ActiveMQ/Transport/Stomp/StompWireFormat.cs Mon Oct 29 06:55:09 2007
@@ -15,16 +15,16 @@
  * limitations under the License.
  */
 using System.Reflection;
-using ActiveMQ.Commands;
-using ActiveMQ.OpenWire.V1;
-using ActiveMQ.Transport;
-using NMS;
+using Apache.ActiveMQ.Commands;
+using Apache.ActiveMQ.OpenWire.V1;
+using Apache.ActiveMQ.Transport;
+using Apache.NMS;
 using System;
 using System.Collections;
 using System.IO;
 using System.Text;
 
-namespace ActiveMQ.Transport.Stomp
+namespace Apache.ActiveMQ.Transport.Stomp
 {
     /// <summary>
     /// Implements the <a href="http://stomp.codehaus.org/">STOMP</a> protocol.
@@ -50,8 +50,7 @@
 
         public void Marshal(Object o, BinaryWriter binaryWriter)
         {
- Console.WriteLine(">>>> " + o);
- //Console.Out.Flush();
+ Tracer.Debug(">>>> " + o);
  StompFrameStream ds = new StompFrameStream(binaryWriter, encoding);
 
  if (o is ConnectionInfo)
@@ -91,13 +90,11 @@
  response.CorrelationId = command.CommandId;
  SendCommand(response);
  }
- Console.WriteLine("#### Ignored command: " + o.GetType());
-                Console.Out.Flush();
+ Tracer.Debug("#### Ignored command: " + o.GetType());
  }
  else
  {
- Console.WriteLine("#### Ignored command: " + o.GetType());
-                Console.Out.Flush();
+ Tracer.Debug("#### Ignored command: " + o.GetType());
  }
         }
 
@@ -130,7 +127,7 @@
  }
  while (command == "");
 
- Console.WriteLine("<<<< command: " + command);
+ Tracer.Debug("<<<< command: " + command);
 
  IDictionary headers = new Hashtable();
  string line;
@@ -143,7 +140,7 @@
  string value = line.Substring(idx + 1);
  headers[key] = value;
 
- Console.WriteLine("<<<< header: " + key + " = " + value);
+ Tracer.Debug("<<<< header: " + key + " = " + value);
  }
  else
  {
@@ -156,6 +153,12 @@
  {
  int size = Int32.Parse(length);
  content = dis.ReadBytes(size);
+ // Read the terminating NULL byte for this frame.
+ int nullByte = dis.Read();
+ if(nullByte != 0)
+ {
+ Tracer.Debug("<<<< error reading frame null byte.");
+ }
  }
  else
  {
@@ -173,8 +176,7 @@
                 content = ms.ToArray();
  }
  Object answer = CreateCommand(command, headers, content);
- Console.WriteLine("<<<< received: " + answer);
- Console.Out.Flush();
+ Tracer.Debug("<<<< received: " + answer);
  return answer;
         }
 
@@ -217,7 +219,7 @@
  {
  return ReadMessage(command, headers, content);
  }
- Console.WriteLine("Unknown command: " + command + " headers: " + headers);
+ Tracer.Error("Unknown command: " + command + " headers: " + headers);
  return null;
  }
 
@@ -319,7 +321,10 @@
             ss.WriteHeader("selector", command.Selector);
             if ( command.NoLocal )
                 ss.WriteHeader("no-local", command.NoLocal);
- ss.WriteHeader("ack", "client");
+
+ if ( AcknowledgementMode.ClientAcknowledge == command.AcknowledgementMode
+ || AcknowledgementMode.AutoClientAcknowledge == command.AcknowledgementMode )
+ ss.WriteHeader("ack", "client");
 
  // ActiveMQ extensions to STOMP
  ss.WriteHeader("activemq.dispatchAsync", command.DispatchAsync);
@@ -328,7 +333,7 @@
     
  ss.WriteHeader("activemq.maximumPendingMessageLimit", command.MaximumPendingMessageLimit);
  ss.WriteHeader("activemq.prefetchSize", command.PrefetchSize);
- ss.WriteHeader("activemq.priority ", command.Priority);
+ ss.WriteHeader("activemq.priority", command.Priority);
             if ( command.Retroactive )
     ss.WriteHeader("activemq.retroactive", command.Retroactive);
 
@@ -393,12 +398,10 @@
  type = "ABORT";
  break;
  }
- Console.WriteLine(">>> For transaction type: " + transactionType + " we are using command type: " + type);
-
+
+ Tracer.Debug(">>> For transaction type: " + transactionType + " we are using command type: " + type);
  ss.WriteCommand(command, type);
-
  ss.WriteHeader("transaction", StompHelper.ToStomp(id));
-
  ss.Flush();
  }
  }
@@ -433,7 +436,14 @@
  else
  {
  ss.Content = command.Content;
- ss.ContentLength = command.Content.Length;
+ if(null != command.Content)
+ {
+ ss.ContentLength = command.Content.Length;
+ }
+ else
+ {
+ ss.ContentLength = 0;
+ }
  }
 
  IPrimitiveMap map = command.Properties;
@@ -460,7 +470,7 @@
  {
  if (transport == null)
  {
- Console.WriteLine("No transport configured so cannot return command: " + command);
+ Tracer.Fatal("No transport configured so cannot return command: " + command);
  }
  else
  {

Modified: activemq/activemq-dotnet/trunk/src/main/csharp/ActiveMQ/Transport/Tcp/TcpTransport.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/trunk/src/main/csharp/ActiveMQ/Transport/Tcp/TcpTransport.cs?rev=589629&r1=589628&r2=589629&view=diff
==============================================================================
--- activemq/activemq-dotnet/trunk/src/main/csharp/ActiveMQ/Transport/Tcp/TcpTransport.cs (original)
+++ activemq/activemq-dotnet/trunk/src/main/csharp/ActiveMQ/Transport/Tcp/TcpTransport.cs Mon Oct 29 06:55:09 2007
@@ -14,18 +14,15 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-using ActiveMQ;
-using ActiveMQ.Commands;
-using ActiveMQ.OpenWire;
-using ActiveMQ.Transport;
+using Apache.ActiveMQ.Commands;
+using Apache.ActiveMQ.OpenWire;
+using Apache.ActiveMQ.Transport;
 using System;
-using System.Collections;
 using System.IO;
-using System.Net;
 using System.Net.Sockets;
 using System.Threading;
 
-namespace ActiveMQ.Transport.Tcp
+namespace Apache.ActiveMQ.Transport.Tcp
 {
 
     /// <summary>
@@ -33,11 +30,14 @@
     /// </summary>
     public class TcpTransport : ITransport
     {
-        private Socket socket;
-        private IWireFormat wireformat = new OpenWireFormat();
+ private readonly object initLock = "initLock";
+        private readonly Socket socket;
+     private IWireFormat wireformat;
         private BinaryReader socketReader;
+ private readonly object socketReaderLock = "socketReaderLock";
         private BinaryWriter socketWriter;
-        private Thread readThread;
+ private readonly object socketWriterLock = "socketWriterLock";
+ private Thread readThread;
         private bool started;
         private Util.AtomicBoolean closed = new Util.AtomicBoolean(false);
         
@@ -55,30 +55,62 @@
         /// </summary>
         public void Start()
         {
-            if (!started)
-            {
-                if( commandHandler == null )
-                    throw new InvalidOperationException ("command cannot be null when Start is called.");
-                if( exceptionHandler == null )
-                    throw new InvalidOperationException ("exception cannot be null when Start is called.");
+ lock (initLock)
+ {
+ if (!started)
+ {
+ if (null == commandHandler)
+ {
+                 throw new InvalidOperationException(
+                 "command cannot be null when Start is called.");
+ }
 
-                started = true;
-                
-                // As reported in AMQ-988 it appears that NetworkStream is not thread safe
-                // so lets use an instance for each of the 2 streams
-                socketWriter = new OpenWireBinaryWriter(new NetworkStream(socket));
-                socketReader = new OpenWireBinaryReader(new NetworkStream(socket));
-                
-                // now lets create the background read thread
-                readThread = new Thread(new ThreadStart(ReadLoop));
-                readThread.Start();
-            }
+ if (null == exceptionHandler)
+             {
+             throw new InvalidOperationException(
+             "exception cannot be null when Start is called.");
+             }
+
+             started = true;
+                
+ // As reported in AMQ-988 it appears that NetworkStream is not thread safe
+ // so lets use an instance for each of the 2 streams
+ socketWriter = new OpenWireBinaryWriter(new NetworkStream(socket));
+ socketReader = new OpenWireBinaryReader(new NetworkStream(socket));
+                
+ // now lets create the background read thread
+ readThread = new Thread(ReadLoop);
+ readThread.Start();
+ }
+ }
         }
         
         public void Oneway(Command command)
         {
-            Wireformat.Marshal(command, socketWriter);
-            socketWriter.Flush();
+ lock (socketWriterLock)
+ {
+ try
+ {
+ Wireformat.Marshal(command, socketWriter);
+ socketWriter.Flush();
+ }
+ catch(Exception ex)
+ {
+ if (command.ResponseRequired)
+ {
+ // Make sure that something higher up doesn't get blocked.
+ // Respond with an exception.
+ ExceptionResponse er = new ExceptionResponse();
+ BrokerError error = new BrokerError();
+
+ error.Message = "Transport connection error: " + ex.Message;
+ error.ExceptionClass = ex.ToString();
+ er.Exception = error;
+ er.CorrelationId = command.CommandId;
+ commandHandler(this, er);
+ }
+ }
+ }
         }
         
         public FutureResponse AsyncRequest(Command command)
@@ -93,14 +125,50 @@
 
         public void Close()
         {
-            if (closed.CompareAndSet(false, true))
-            {
-                socket.Close();
-                if (System.Threading.Thread.CurrentThread != readThread)
-                    readThread.Join();
-                socketWriter.Close();
-                socketReader.Close();
-            }
+ lock (initLock)
+ {
+ if (closed.CompareAndSet(false, true))
+ {
+ try
+ {
+ socket.Shutdown(SocketShutdown.Both);
+ }
+ catch
+ {
+ }
+
+ lock (socketWriterLock)
+ {
+ if(null != socketWriter)
+ {
+             socketWriter.Close();
+ socketWriter = null;
+ }
+ }
+
+ lock (socketReaderLock)
+ {
+ if(null != socketReader)
+ {
+ socketReader.Close();
+ socketReader = null;
+ }
+ }
+
+ socket.Close();
+
+ if(null != readThread
+ && Thread.CurrentThread != readThread
+ && readThread.IsAlive)
+ {
+ readThread.Abort();
+ readThread.Join();
+ readThread = null;
+ }
+ }
+
+ started = false;
+ }
         }
 
         public void Dispose()
@@ -133,11 +201,11 @@
                 }
                 catch(Exception ex)
                 {
-                    if( !closed.Value )
+                    if (!closed.Value)
                     {
-                        this.exceptionHandler(this, ex);
-                        // Close the socket as there's little that can be done with this transport now.
-                        Close();
+ // Close the socket as there's little that can be done with this transport now.
+ Close();
+ this.exceptionHandler(this, ex);
                         break;
                     }
                 }
@@ -149,7 +217,7 @@
  this.commandHandler(this, command);
  }
                 }
-                catch ( Exception e)
+                catch (Exception e)
                 {
                     this.exceptionHandler(this, e);
                 }
@@ -158,12 +226,14 @@
                 
         // Implementation methods
                 
-        public CommandHandler Command {
+        public CommandHandler Command
+ {
             get { return commandHandler; }
             set { this.commandHandler = value; }
         }
 
-        public  ExceptionHandler Exception {
+        public  ExceptionHandler Exception
+ {
             get { return exceptionHandler; }
             set { this.exceptionHandler = value; }
         }

Modified: activemq/activemq-dotnet/trunk/src/main/csharp/ActiveMQ/Transport/Tcp/TcpTransportFactory.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/trunk/src/main/csharp/ActiveMQ/Transport/Tcp/TcpTransportFactory.cs?rev=589629&r1=589628&r2=589629&view=diff
==============================================================================
--- activemq/activemq-dotnet/trunk/src/main/csharp/ActiveMQ/Transport/Tcp/TcpTransportFactory.cs (original)
+++ activemq/activemq-dotnet/trunk/src/main/csharp/ActiveMQ/Transport/Tcp/TcpTransportFactory.cs Mon Oct 29 06:55:09 2007
@@ -18,88 +18,106 @@
 using System;
 using System.Net;
 using System.Net.Sockets;
-using ActiveMQ.Commands;
-using ActiveMQ.OpenWire;
-using ActiveMQ.Transport;
-using ActiveMQ.Transport.Stomp;
-using ActiveMQ.Util;
-
-namespace ActiveMQ.Transport.Tcp {
-        public class TcpTransportFactory : ITransportFactory
-        {
-                private bool useLogging = false;
-
-                public bool UseLogging {
-                        get { return useLogging; }
-                        set { useLogging = value; }
-                }
-
-                public ITransport CreateTransport(Uri location)
-                {
-                        // Extract query parameters from broker Uri
-                        System.Collections.Specialized.StringDictionary map = URISupport.ParseQuery(location.Query);
-
-                        // Set transport. properties on this (the factory)
-                        URISupport.SetProperties(this, map, "transport.");
-
-                        // Console.WriteLine("Opening socket to: " + host + " on port: " + port);
-                        Socket socket = Connect(location.Host, location.Port);
- IWireFormat wireformat = CreateWireFormat(location, map);
-                        TcpTransport tcpTransport = new TcpTransport(socket, wireformat);
- wireformat.Transport = tcpTransport;
-                        ITransport rc = tcpTransport;
-
-                        if (UseLogging)
-                        {
-                                rc = new LoggingTransport(rc);
-                        }
-
- if (wireformat is OpenWireFormat)
- {
-                        rc = new WireFormatNegotiator(rc, (OpenWireFormat) wireformat);
- }
-                        rc = new MutexTransport(rc);
-                        rc = new ResponseCorrelator(rc);
-
-                        return rc;
-                }
-
-                protected Socket Connect(string host, int port)
-                {
-                        // Looping through the AddressList allows different type of connections to be tried
-                        // (IPv4, IPv6 and whatever else may be available).
-                        IPHostEntry hostEntry = Dns.GetHostByName(host);
-                        foreach (IPAddress address in hostEntry.AddressList)
-                        {
-                                Socket socket = new Socket(address.AddressFamily, SocketType.Stream, ProtocolType.Tcp);
-                                socket.Connect(new IPEndPoint(address, port));
-                                if (socket.Connected)
-                                {
-                                        return socket;
-                                }
-                        }
-                        throw new SocketException();
-                }
-
- protected IWireFormat CreateWireFormat(Uri location, System.Collections.Specialized.StringDictionary map)
+using System.Collections.Specialized;
+using Apache.ActiveMQ.OpenWire;
+using Apache.ActiveMQ.Transport.Stomp;
+using Apache.ActiveMQ.Util;
+using Apache.NMS;
+
+namespace Apache.ActiveMQ.Transport.Tcp
+{
+ public class TcpTransportFactory : ITransportFactory
+ {
+ private bool useLogging = false;
+
+ public TcpTransportFactory()
+ {
+ }
+
+ public bool UseLogging
+ {
+ get { return useLogging; }
+ set { useLogging = value; }
+ }
+
+ #region ITransportFactory Members
+
+ public ITransport CreateTransport(Uri location)
+ {
+ // Extract query parameters from broker Uri
+ StringDictionary map = URISupport.ParseQuery(location.Query);
+
+ // Set transport. properties on this (the factory)
+ URISupport.SetProperties(this, map, "transport.");
+
+ Tracer.Debug("Opening socket to: " + location.Host + " on port: " + location.Port);
+ Socket socket = Connect(location.Host, location.Port);
+ IWireFormat wireformat = CreateWireFormat(location, map);
+ ITransport transport = new TcpTransport(socket, wireformat);
+
+ wireformat.Transport = transport;
+
+ if(UseLogging)
+ {
+ transport = new LoggingTransport(transport);
+ }
+
+ if(wireformat is OpenWireFormat)
+ {
+ transport = new WireFormatNegotiator(transport, (OpenWireFormat) wireformat);
+ }
+
+ transport = new MutexTransport(transport);
+ transport = new ResponseCorrelator(transport);
+
+ return transport;
+ }
+
+ #endregion
+
+ protected Socket Connect(string host, int port)
+ {
+ // Looping through the AddressList allows different type of connections to be tried
+ // (IPv4, IPv6 and whatever else may be available).
+ IPHostEntry hostEntry = Dns.GetHostEntry(host);
+ foreach(IPAddress address in hostEntry.AddressList)
+ {
+ Socket socket = new Socket(address.AddressFamily, SocketType.Stream, ProtocolType.Tcp);
+ socket.Connect(new IPEndPoint(address, port));
+ if(socket.Connected)
  {
- // TODO detect STOMP etc
- if ("stomp".Equals(location.Scheme))
- {
- IWireFormat answer = new StompWireFormat();
-
-                    // Set wireformat. properties on the wireformat owned by the tcpTransport
-                    URISupport.SetProperties(answer, map, "wireFormat.");
- return answer;
- }
- else
- {
- OpenWireFormat answer = new OpenWireFormat();
-
-                    // Set wireformat. properties on the wireformat owned by the tcpTransport
-                    URISupport.SetProperties(answer.PreferedWireFormatInfo, map, "wireFormat.");
- return answer;
- }
+ return socket;
  }
-        }
+ }
+ throw new SocketException();
+ }
+
+ protected IWireFormat CreateWireFormat(Uri location, StringDictionary map)
+ {
+ object properties = null;
+ IWireFormat wireFormat = null;
+
+ // Detect STOMP etc
+ if(String.Compare(location.Scheme, "stomp", true) == 0)
+ {
+ wireFormat = new StompWireFormat();
+ properties = wireFormat;
+ }
+ else
+ {
+ OpenWireFormat openwireFormat = new OpenWireFormat();
+
+ wireFormat = openwireFormat;
+ properties = openwireFormat.PreferedWireFormatInfo;
+ }
+
+ if(null != properties)
+ {
+ // Set wireformat. properties on the wireformat owned by the tcpTransport
+ URISupport.SetProperties(properties, map, "wireFormat.");
+ }
+
+ return wireFormat;
+ }
+ }
 }

Modified: activemq/activemq-dotnet/trunk/src/main/csharp/ActiveMQ/Transport/TransportFilter.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/trunk/src/main/csharp/ActiveMQ/Transport/TransportFilter.cs?rev=589629&r1=589628&r2=589629&view=diff
==============================================================================
--- activemq/activemq-dotnet/trunk/src/main/csharp/ActiveMQ/Transport/TransportFilter.cs (original)
+++ activemq/activemq-dotnet/trunk/src/main/csharp/ActiveMQ/Transport/TransportFilter.cs Mon Oct 29 06:55:09 2007
@@ -14,11 +14,11 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-using ActiveMQ.Commands;
-using ActiveMQ.Transport;
+using Apache.ActiveMQ.Commands;
+using Apache.ActiveMQ.Transport;
 using System;
 
-namespace ActiveMQ.Transport
+namespace Apache.ActiveMQ.Transport
 {
 
  /// <summary>

Modified: activemq/activemq-dotnet/trunk/src/main/csharp/ActiveMQ/Transport/WireFormatNegotiator.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/trunk/src/main/csharp/ActiveMQ/Transport/WireFormatNegotiator.cs?rev=589629&r1=589628&r2=589629&view=diff
==============================================================================
--- activemq/activemq-dotnet/trunk/src/main/csharp/ActiveMQ/Transport/WireFormatNegotiator.cs (original)
+++ activemq/activemq-dotnet/trunk/src/main/csharp/ActiveMQ/Transport/WireFormatNegotiator.cs Mon Oct 29 06:55:09 2007
@@ -16,13 +16,13 @@
  */
 using System.IO;
 using System.Threading;
-using ActiveMQ.Commands;
-using ActiveMQ.OpenWire;
-using ActiveMQ.Transport;
+using Apache.ActiveMQ.Commands;
+using Apache.ActiveMQ.OpenWire;
+using Apache.ActiveMQ.Transport;
 using System;
-using ActiveMQ.Util;
+using Apache.ActiveMQ.Util;
 
-namespace ActiveMQ.Transport
+namespace Apache.ActiveMQ.Transport
 {
 
     /// <summary>

Modified: activemq/activemq-dotnet/trunk/src/main/csharp/ActiveMQ/Util/AtomicBoolean.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/trunk/src/main/csharp/ActiveMQ/Util/AtomicBoolean.cs?rev=589629&r1=589628&r2=589629&view=diff
==============================================================================
--- activemq/activemq-dotnet/trunk/src/main/csharp/ActiveMQ/Util/AtomicBoolean.cs (original)
+++ activemq/activemq-dotnet/trunk/src/main/csharp/ActiveMQ/Util/AtomicBoolean.cs Mon Oct 29 06:55:09 2007
@@ -17,7 +17,7 @@
 using System;
 using System.Text;
 
-namespace ActiveMQ.Util
+namespace Apache.ActiveMQ.Util
 {
     public class AtomicBoolean
     {

Modified: activemq/activemq-dotnet/trunk/src/main/csharp/ActiveMQ/Util/CountDownLatch.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/trunk/src/main/csharp/ActiveMQ/Util/CountDownLatch.cs?rev=589629&r1=589628&r2=589629&view=diff
==============================================================================
--- activemq/activemq-dotnet/trunk/src/main/csharp/ActiveMQ/Util/CountDownLatch.cs (original)
+++ activemq/activemq-dotnet/trunk/src/main/csharp/ActiveMQ/Util/CountDownLatch.cs Mon Oct 29 06:55:09 2007
@@ -17,7 +17,7 @@
 using System;
 using System.Threading;
 
-namespace ActiveMQ.Util
+namespace Apache.ActiveMQ.Util
 {
     class CountDownLatch
     {

Modified: activemq/activemq-dotnet/trunk/src/main/csharp/ActiveMQ/Util/DateUtils.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/trunk/src/main/csharp/ActiveMQ/Util/DateUtils.cs?rev=589629&r1=589628&r2=589629&view=diff
==============================================================================
--- activemq/activemq-dotnet/trunk/src/main/csharp/ActiveMQ/Util/DateUtils.cs (original)
+++ activemq/activemq-dotnet/trunk/src/main/csharp/ActiveMQ/Util/DateUtils.cs Mon Oct 29 06:55:09 2007
@@ -16,39 +16,49 @@
  */
 using System;
 
-namespace ActiveMQ.Util
+namespace Apache.ActiveMQ.Util
 {
- internal class DateUtils
+ public class DateUtils
  {
  /// <summary>
- /// The difference between the Windows epoch (1601-01-01 00:00:00)
- /// and the Unix epoch (1970-01-01 00:00:00) in milliseconds.
+ /// The start of the Windows epoch
  /// </summary>
- public static readonly long EPOCH_DIFF = 11644473600000L;
-
-        /// <summary>
-        /// The start of the UNIX epoch
-        /// </summary>
-        public static readonly DateTime UNIX_EPOCH = new DateTime(1970, 1, 1, 0, 0, 0, 0);
+ public static readonly DateTime windowsEpoch = new DateTime(1601, 1, 1, 0, 0, 0, 0);
+ /// <summary>
+ /// The start of the Java epoch
+ /// </summary>
+ public static readonly DateTime javaEpoch = new DateTime(1970, 1, 1, 0, 0, 0, 0);
 
  /// <summary>
- /// Method ToJavaTime
+ /// The difference between the Windows epoch and the Java epoch
+ /// in milliseconds.
  /// </summary>
- /// <param name="timeToLive">A  TimeSpan</param>
- /// <returns>A  long</retutns>
- public static long ToJavaTime(TimeSpan timeToLive)
+ public static readonly long epochDiff; /* = 1164447360000L; */
+
+ static DateUtils()
+ {
+ epochDiff = (javaEpoch.ToFileTimeUtc() - windowsEpoch.ToFileTimeUtc())
+ / TimeSpan.TicksPerMillisecond;
+ }
+
+ public static long ToJavaTime(DateTime dateTime)
+ {
+ return (dateTime.ToFileTime() / TimeSpan.TicksPerMillisecond) - epochDiff;
+ }
+
+ public static DateTime ToDateTime(long javaTime)
  {
- return ToJavaTime(new DateTime(timeToLive.Ticks));
+ return DateTime.FromFileTime((javaTime + epochDiff) * TimeSpan.TicksPerMillisecond);
  }
 
-    public static long ToJavaTime(DateTime dateTime)
+ public static long ToJavaTimeUtc(DateTime dateTime)
  {
- return dateTime.ToFileTime() - EPOCH_DIFF;
+ return (dateTime.ToFileTimeUtc() / TimeSpan.TicksPerMillisecond) - epochDiff;
  }
 
-        public static DateTime ToDateTime(long dateTime)
-        {
-            return UNIX_EPOCH.AddMilliseconds(dateTime);
-        }
+ public static DateTime ToDateTimeUtc(long javaTime)
+ {
+ return DateTime.FromFileTimeUtc((javaTime + epochDiff) * TimeSpan.TicksPerMillisecond);
+ }
  }
 }

Modified: activemq/activemq-dotnet/trunk/src/main/csharp/ActiveMQ/Util/URISupport.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/trunk/src/main/csharp/ActiveMQ/Util/URISupport.cs?rev=589629&r1=589628&r2=589629&view=diff
==============================================================================
--- activemq/activemq-dotnet/trunk/src/main/csharp/ActiveMQ/Util/URISupport.cs (original)
+++ activemq/activemq-dotnet/trunk/src/main/csharp/ActiveMQ/Util/URISupport.cs Mon Oct 29 06:55:09 2007
@@ -19,7 +19,7 @@
 using System.Globalization;
 using System.Text;
 
-namespace ActiveMQ.Util
+namespace Apache.ActiveMQ.Util
 {
  /// <summary>
  /// Class to provide support for URI query parameters which uses .Net reflection

Modified: activemq/activemq-dotnet/trunk/src/main/csharp/MSMQ/BaseMessage.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/trunk/src/main/csharp/MSMQ/BaseMessage.cs?rev=589629&r1=589628&r2=589629&view=diff
==============================================================================
--- activemq/activemq-dotnet/trunk/src/main/csharp/MSMQ/BaseMessage.cs (original)
+++ activemq/activemq-dotnet/trunk/src/main/csharp/MSMQ/BaseMessage.cs Mon Oct 29 06:55:09 2007
@@ -14,19 +14,19 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-using NMS;
+using Apache.NMS;
 using System;
 
-namespace MSMQ
+namespace Apache.MSMQ
 {
  public delegate void AcknowledgeHandler(BaseMessage baseMessage);
 
     public class BaseMessage : IMessage
     {
-        private PrimitiveMap properties;
+        private PrimitiveMap propertiesMap = new PrimitiveMap();
         private IDestination destination;
         private string correlationId;
-        private TimeSpan expiration;
+        private TimeSpan timeToLive;
         private string messageId;
         private bool persistent;
         private byte priority;
@@ -34,7 +34,7 @@
         private string type;
         private event AcknowledgeHandler Acknowledger;
         private byte[] content;
-        private DateTime timestamp;
+        private DateTime timestamp = new DateTime();
 
         public byte[] Content
         {
@@ -60,7 +60,7 @@
         public IPrimitiveMap Properties
         {
             get {
-                return properties;
+ return propertiesMap;
             }
         }
         
@@ -98,13 +98,13 @@
         /// <summary>
         /// The time in milliseconds that this message should expire in
         /// </summary>
-        public TimeSpan NMSExpiration
+        public TimeSpan NMSTimeToLive
         {
             get {
-                return expiration;
+ return timeToLive;
             }
             set {
-                expiration = value;
+ timeToLive = value;
             }
         }
         

Modified: activemq/activemq-dotnet/trunk/src/main/csharp/MSMQ/BytesMessage.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/trunk/src/main/csharp/MSMQ/BytesMessage.cs?rev=589629&r1=589628&r2=589629&view=diff
==============================================================================
--- activemq/activemq-dotnet/trunk/src/main/csharp/MSMQ/BytesMessage.cs (original)
+++ activemq/activemq-dotnet/trunk/src/main/csharp/MSMQ/BytesMessage.cs Mon Oct 29 06:55:09 2007
@@ -15,10 +15,10 @@
  * limitations under the License.
  */
 using System;
-using NMS;
+using Apache.NMS;
 
 
-namespace MSMQ
+namespace Apache.MSMQ
 {
  public class BytesMessage : BaseMessage, IBytesMessage
     {

Modified: activemq/activemq-dotnet/trunk/src/main/csharp/MSMQ/CommonAssemblyInfo.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/trunk/src/main/csharp/MSMQ/CommonAssemblyInfo.cs?rev=589629&r1=589628&r2=589629&view=diff
==============================================================================
--- activemq/activemq-dotnet/trunk/src/main/csharp/MSMQ/CommonAssemblyInfo.cs (original)
+++ activemq/activemq-dotnet/trunk/src/main/csharp/MSMQ/CommonAssemblyInfo.cs Mon Oct 29 06:55:09 2007
@@ -6,7 +6,7 @@
 //------------------------------------------------------------------------------
 // <auto-generated>
 //     This code was generated by a tool.
-//     Runtime Version:2.0.50727.42
+//     Runtime Version:2.0.50727.832
 //
 //     Changes to this file may cause incorrect behavior and will be lost if
 //     the code is regenerated.

Modified: activemq/activemq-dotnet/trunk/src/main/csharp/MSMQ/Connection.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/trunk/src/main/csharp/MSMQ/Connection.cs?rev=589629&r1=589628&r2=589629&view=diff
==============================================================================
--- activemq/activemq-dotnet/trunk/src/main/csharp/MSMQ/Connection.cs (original)
+++ activemq/activemq-dotnet/trunk/src/main/csharp/MSMQ/Connection.cs Mon Oct 29 06:55:09 2007
@@ -14,11 +14,10 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-using ActiveMQ;
-using NMS;
+using Apache.NMS;
 using System;
 
-namespace MSMQ
+namespace Apache.MSMQ
 {
     /// <summary>
     /// Represents a NMS connection MSMQ.  Since the underlying MSMQ APIs are actually

Modified: activemq/activemq-dotnet/trunk/src/main/csharp/MSMQ/ConnectionFactory.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/trunk/src/main/csharp/MSMQ/ConnectionFactory.cs?rev=589629&r1=589628&r2=589629&view=diff
==============================================================================
--- activemq/activemq-dotnet/trunk/src/main/csharp/MSMQ/ConnectionFactory.cs (original)
+++ activemq/activemq-dotnet/trunk/src/main/csharp/MSMQ/ConnectionFactory.cs Mon Oct 29 06:55:09 2007
@@ -14,31 +14,46 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-using NMS;
 using System;
+using Apache.NMS;
 
-namespace MSMQ
+namespace Apache.MSMQ
 {
     /// <summary>
     /// A Factory that can estbalish NMS connections to MSMQ
     /// </summary>
     public class ConnectionFactory : IConnectionFactory
-    {  
-        //
-        // Creates a connection to MSMQ
-        //
-        public IConnection CreateConnection()
+    {
+ public ConnectionFactory()
+ {
+ }
+
+ public ConnectionFactory(Uri brokerUri, string clientID)
+ {
+ }
+
+     /// <summary>
+ /// Creates a new connection to MSMQ.
+ /// </summary>
+ public IConnection CreateConnection()
         {
             return new Connection();
         }
-        
-        //
-        // Creates a connection to MSQM
-        //
-        public IConnection CreateConnection(string userName, string password)
+
+ /// <summary>
+ /// Creates a new connection to MSMQ.
+ /// </summary>
+ public IConnection CreateConnection(string userName, string password)
         {
             return new Connection();
         }
-                
-    }
+
+ /// <summary>
+ /// Creates a new connection to MSMQ.
+ /// </summary>
+ public IConnection CreateConnection(string userName, string password, bool useLogging)
+ {
+ return new Connection();
+ }
+ }
 }

Modified: activemq/activemq-dotnet/trunk/src/main/csharp/MSMQ/DefaultMessageConverter.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/trunk/src/main/csharp/MSMQ/DefaultMessageConverter.cs?rev=589629&r1=589628&r2=589629&view=diff
==============================================================================
--- activemq/activemq-dotnet/trunk/src/main/csharp/MSMQ/DefaultMessageConverter.cs (original)
+++ activemq/activemq-dotnet/trunk/src/main/csharp/MSMQ/DefaultMessageConverter.cs Mon Oct 29 06:55:09 2007
@@ -17,9 +17,9 @@
 using System;
 using System.Text;
 using System.Messaging;
-using NMS;
+using Apache.NMS;
 
-namespace MSMQ
+namespace Apache.MSMQ
 {
     public class DefaultMessageConverter : IMessageConverter
  {
@@ -34,7 +34,7 @@
             }
             //if (message.NMSExpiration != null)
             //{
-                answer.TimeToBeReceived = message.NMSExpiration;
+                answer.TimeToBeReceived = message.NMSTimeToLive;
             //}
             if (message.NMSCorrelationID != null)
             {
@@ -58,7 +58,7 @@
  answer.NMSDestination = ToNmsDestination(message.DestinationQueue);
  answer.NMSType = message.Label;
  answer.NMSReplyTo = ToNmsDestination(message.ResponseQueue);
- answer.NMSExpiration = message.TimeToBeReceived;
+ answer.NMSTimeToLive = message.TimeToBeReceived;
             return answer;
         }
 

Modified: activemq/activemq-dotnet/trunk/src/main/csharp/MSMQ/Destination.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/trunk/src/main/csharp/MSMQ/Destination.cs?rev=589629&r1=589628&r2=589629&view=diff
==============================================================================
--- activemq/activemq-dotnet/trunk/src/main/csharp/MSMQ/Destination.cs (original)
+++ activemq/activemq-dotnet/trunk/src/main/csharp/MSMQ/Destination.cs Mon Oct 29 06:55:09 2007
@@ -14,9 +14,9 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-using NMS;
+using Apache.NMS;
 using System;
-namespace MSMQ
+namespace Apache.MSMQ
 {
     
     /// <summary>

Modified: activemq/activemq-dotnet/trunk/src/main/csharp/MSMQ/IMessageConverter.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/trunk/src/main/csharp/MSMQ/IMessageConverter.cs?rev=589629&r1=589628&r2=589629&view=diff
==============================================================================
--- activemq/activemq-dotnet/trunk/src/main/csharp/MSMQ/IMessageConverter.cs (original)
+++ activemq/activemq-dotnet/trunk/src/main/csharp/MSMQ/IMessageConverter.cs Mon Oct 29 06:55:09 2007
@@ -15,9 +15,9 @@
  * limitations under the License.
  */
 using System.Messaging;
-using NMS;
+using Apache.NMS;
 
-namespace MSMQ
+namespace Apache.MSMQ
 {
     public interface IMessageConverter
     {