View Javadoc

1   /* (c) Copyright 2003 Caleigo AB, All rights reserved. 
2    * 
3    * This library is free software; you can redistribute it and/or
4    * modify it under the terms of the GNU Lesser General Public
5    * License as published by the Free Software Foundation; either
6    * version 2.1 of the License, or (at your option) any later version.
7    * 
8    * This library is distributed in the hope that it will be useful,
9    * but WITHOUT ANY WARRANTY; without even the implied warranty of
10   * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
11   * Lesser General Public License for more details.
12   * 
13   * You should have received a copy of the GNU Lesser General Public
14   * License along with this library; if not, write to the Free Software
15   * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
16   *  
17   */
18  
19  package org.caleigo.toolkit.tunnel;
20  
21  
22  import java.io.*;
23  import java.util.*;
24  
25  import org.caleigo.security.*;
26  import org.caleigo.service.*;
27  import org.caleigo.toolkit.log.Log;
28  
29  /*** Abstract class that implements most of the functinallity needed by an ITunnel.
30   * Only to methods need to be implemented by subclsses: getOutStream and getInStream.
31   * Optionally the method createPackage can be overridden to provide custom implementations
32   * of the IPackage interface. 
33   *
34   * @author  Mattias Hagstrand
35   * @version 1.00
36   * 
37   *//* 
38   *
39   * WHEN        WHO               WHY & WHAT
40   * -----------------------------------------------------------------------------
41   * 2002-07-01  Mattias Hagstrand Creation
42   */
43  public abstract class AbstractTunnel implements ITunnel
44  {
45      // Constants ---------------------------------------------------------------
46      public static final int DEBUG = 0;
47      public static final int INFO = 1;
48      public static final int WARNING = 2;
49      public static final int ERROR = 3;
50      
51      public static final int DEFAULT_NBR_OF_WORKER_THREADS = 5;
52      
53      // Class members -----------------------------------------------------------
54      protected static int sNbrOfWorkerThreads;
55      
56      // Data members ------------------------------------------------------------
57      protected List mMessageConsumers;
58      protected ITunnelCodec mCodec;
59      protected List mPackers;
60      protected List mStatusCallbacks;
61      protected int mLogLevel;
62      protected List mOutQueue;
63      protected List mInQueue;
64      protected List mTunnelListeners;
65      
66      private boolean mIsInitialized;
67      protected int mCurrentPackageID;
68      protected WorkerThread[] mWorkerThreads;
69      protected OutputThread mOutputThread;
70      protected InputThread mInputThread;
71      protected Map mReplyMessages;
72      
73      // Static methods ----------------------------------------------------------
74      static
75      {
76          sNbrOfWorkerThreads = DEFAULT_NBR_OF_WORKER_THREADS;
77      }
78      
79      /*** Sets the number of worker threads for all tunnels that are initilized
80       * after this methods has been called. Calling this method has no effect on
81       * tunnels that have allready been initilized.
82       */
83      public static void setNbrOfWorkerThreads(int nbrOfWorkerThreads)
84      {
85          if (nbrOfWorkerThreads < 1)
86              throw new IllegalArgumentException("nbrOfWorkerThreads < 1");
87          
88          sNbrOfWorkerThreads = nbrOfWorkerThreads;
89      }
90      
91      // Constructors ------------------------------------------------------------
92      
93      /***
94       * Default constructor. Uses a SerializeCodec as codec.
95       */
96      public AbstractTunnel()
97      {
98          this(new SerializeCodec());
99      }
100     
101     public AbstractTunnel(ITunnelCodec codec)
102     {
103         mCodec = codec;
104         
105         mMessageConsumers = new ArrayList();
106         mPackers = new ArrayList();
107         mStatusCallbacks = new ArrayList();
108         mOutQueue = new ArrayList();
109         mInQueue = new ArrayList();
110         mReplyMessages = new HashMap();
111         mTunnelListeners = new ArrayList();
112         mLogLevel = AbstractTunnel.WARNING;
113     }
114     
115     // ITunnel implementation --------------------------------------------------
116     
117     /***
118      * Initializes the ITunnel. This method must be called before any messages
119      * can be sent. After this method has been called it is not possible to
120      * set the ITunnelCodec or to add or remove ITunnelPackers. Subclasses of
121      * AbstractTunnel must call super.initialize(). Successful initialization
122      * recuires that the methods getInStream and getOutStream does not return
123      * <code>null</code>.
124      */
125     public synchronized void initialize()
126         throws IllegalStateException, TunnelException
127     {
128         if (mIsInitialized)
129             throw new IllegalStateException("The tunnel is initialized");
130 
131         mIsInitialized = true;
132         
133         mOutputThread = new OutputThread();
134         mOutputThread.start();
135         mInputThread = new InputThread();
136         mInputThread.start();
137         
138         // Create and start worker threads
139         mWorkerThreads = new WorkerThread[sNbrOfWorkerThreads];
140         for (int i = 0; i < mWorkerThreads.length; i++)
141         {
142             mWorkerThreads[i] = new WorkerThread();
143             mWorkerThreads[i].start();
144         }
145     }
146     
147     /***
148      * Performs cleanup for this ITunnel. No messages can be sent after this
149      * method has been called. This method will send all messages in the message
150      * queue before closing down. Subclasses of AbstractTunnel must call super.finalize()
151      * before they do any finalization of their own.
152      */
153     public synchronized void finalize()
154         throws IllegalStateException, TunnelException
155     {
156         if (!mIsInitialized)
157             throw new IllegalStateException("The tunnel is not initialized");
158 
159         //mIsInitialized = false;
160         
161         // Stop receiving messages
162         mInputThread.stopThread();
163 
164 //        // Stop worker threads
165 //        for (int i = 0; i < mWorkerThreads.length; i++)
166 //            mWorkerThreads[i].stopWorkerThread();
167 //        
168 //        synchronized (mInQueue)
169 //        {
170 //            mInQueue.notifyAll();
171 //        }
172         
173         synchronized (mOutQueue)
174         {
175             // Remove all packages from the out queue except those what conatin a finalize message
176             Iterator packages = mOutQueue.iterator();
177             while (packages.hasNext())
178             {
179                 IPackage currentPackage = (IPackage) packages.next();
180                 if (!(currentPackage.getMessage() instanceof IFinalizeMessage))
181                     packages.remove();
182             }
183             
184             // Wait for the finalize messages to be sent
185             try
186             {
187                 while (mOutQueue.size() > 0)
188                     mOutQueue.wait();
189             }
190             catch (InterruptedException ie)
191             {
192                 Log.printError(this, "Interrupted while waiting for finalize message to be sent", ie);
193             }
194         }
195         
196         mIsInitialized = false;
197         
198         // Stop worker threads
199         for (int i = 0; i < mWorkerThreads.length; i++)
200             mWorkerThreads[i].stopWorkerThread();
201         
202         synchronized (mInQueue)
203         {
204             mInQueue.notifyAll();
205         }
206         
207         // Stop message sender
208         mOutputThread.stopThread();
209         
210         synchronized (mOutQueue)
211         {
212             mOutQueue.notifyAll();
213         }
214         
215         this.fireTunnelClosedEvent();
216     }
217     
218     /*** Returns <code>true</code> if this tunnel is active, that is, if it is
219      * correctly initialized. If this method returns <code>true</code> then
220      * a call to finalize must not throw an IllegalStateException.
221      */
222     public synchronized boolean isActive()
223     {
224         return mIsInitialized;
225     }
226     
227     /***
228      * Sends a ping message over this ITunnel. This method is typically used to
229      * check that the communication is working.
230      *
231      * @return  the ping time in milli seconds.
232      */
233     public int ping()
234         throws TunnelException
235     {
236         if (!mIsInitialized)
237             throw new TunnelException("The tunnel is not initialized");
238         
239         // Create ping package
240         IPackage pingPackage = this.createPackage("", this.getSessionID(""));
241         pingPackage.setType(IPackage.PING);
242         
243         // Add the package to the out queue and wait for a reply
244         long startTime = System.currentTimeMillis();
245         this.addToOutQueue(pingPackage);
246         this.waitForReply(pingPackage);
247         
248         // Calculate and return the ping time
249         return (int) (System.currentTimeMillis() - startTime);
250     }
251     
252     /***
253      * Sends a message over this ITunnel. This method returns after the message
254      * has been sent.
255      */
256     public void send(Object message)
257         throws TunnelException
258     {
259         if (!mIsInitialized)
260             throw new TunnelException("The tunnel is not initialized");
261         
262         IPackage outPackage = this.createPackage(message, this.getSessionID(message));
263         outPackage.setType(IPackage.DATA);
264         this.addToOutQueue(outPackage);
265     }
266     
267     /***
268      * Sends a request/reply message over this ITunnel. This method blocks until
269      * the reply is received.
270      */
271     public Object ask(Object message)
272         throws TunnelException
273     {
274         if (!mIsInitialized)
275             throw new TunnelException("The tunnel is not initialized");
276         
277         IPackage outPackage = this.createPackage(message, this.getSessionID(message));
278         outPackage.setType(IPackage.REQUEST);
279         this.addToOutQueue(outPackage);
280         return this.waitForReply(outPackage);
281     }
282 
283     /***
284      * Adds the consumer to the end of this ITunnels message consumer list.
285      */
286     public synchronized void addMessageConsumer(IMessageConsumer consumer)
287     {
288         if (!mMessageConsumers.contains(consumer))
289             mMessageConsumers.add(consumer);
290     }
291     
292     /***
293      * Removes the consumer from this ITunnels message consumer list.
294      */
295     public synchronized void remomveMessageConsumer(IMessageConsumer consumer)
296     {
297         mMessageConsumers.remove(consumer);
298     }
299     
300     /***
301      * Sets the ITunnelCodec that should be used by this ITunnel.
302      */
303     public synchronized void setCodec(ITunnelCodec codec)
304         throws IllegalStateException
305     {
306         if (mIsInitialized)
307             throw new IllegalStateException("The tunnel is initialized");
308 
309         mCodec = codec;
310     }
311     
312     /***
313      * Adds the packer to the end of this ITunnels packer list.
314      */
315     public synchronized void addPacker(ITunnelPacker packer)
316         throws IllegalStateException
317     {
318         if (mIsInitialized)
319             throw new IllegalStateException("The tunnel is initialized");
320 
321         mPackers.add(packer);
322     }
323     
324     /***
325      * Removes the packer from this ITunnels packer list.
326      */
327     public synchronized void remomvePacker(ITunnelPacker packer)
328         throws IllegalStateException
329     {
330         if (mIsInitialized)
331             throw new IllegalStateException("The tunnel is initialized");
332         
333         mPackers.remove(packer);
334     }
335     
336     /***
337      * Adds the callback to the list of IStatusCallbacks that receives status
338      * messages from this ITunnel.
339      */
340     public synchronized void addStatusCallback(IStatusCallback callback)
341     {
342         if (!mStatusCallbacks.contains(callback))
343             mStatusCallbacks.add(callback);
344     }
345     
346     /***
347      * Removes the callback from the list of IStatusCallbacks.
348      */
349     public synchronized void removeStatusCallback(IStatusCallback callback)
350     {
351         mStatusCallbacks.remove(callback);
352     }
353     
354     /***
355      * Sets the log level for this ITunnel.
356      */
357     public void setLogLevel(int level)
358     {
359         mLogLevel = level;
360     }
361     
362     /***
363      * Gets the log level for this ITunnel.
364      */
365     public int getLogLevel()
366     {
367         return mLogLevel;
368     }
369     
370     public void addTunnelListener(ITunnelListener listener)
371     {
372         if (!mTunnelListeners.contains(listener))
373             mTunnelListeners.add(listener);
374     }
375     
376     public void removeTunnelListener(ITunnelListener listener)
377     {
378         mTunnelListeners.remove(listener);
379     }
380     
381     /*** Returns a humanly readably text that describes the ITunnel.
382      */
383     public String getDescription()
384     {
385         return "[no description avilable]";
386     }
387     
388     // Action methods ----------------------------------------------------------
389     
390     /***
391      * Creates an IPackage that contains the provided message.
392      */
393     public IPackage createPackage(Object message, int sessionID)
394     {
395         return new DefaultPackage(this.getNextPackageID(), message, sessionID);
396     }
397     
398     // Abstract methods --------------------------------------------------------
399     
400     /***
401      * Returns an OutputStream that should be used to send messages.
402      */
403     protected abstract OutputStream getOutputStream();
404     
405     /***
406      * Returns an InputStream that should be used to receive messages.
407      */
408     protected abstract InputStream getInputStream();
409     
410     // Help methods ------------------------------------------------------------
411     protected IConnectionMessage createConnectionMessage()
412     {
413         // Get set up information
414         String codecClassName = mCodec.getClass().getName();
415         String[] packerClassNames = new String[mPackers.size()];
416         for (int i = 0; i < packerClassNames.length; i++)
417             packerClassNames[i] = mPackers.get(i).getClass().getName();
418         
419         return new DefaultConnectionMessage(codecClassName, packerClassNames);
420     }
421         
422     
423     protected void log(Object source, int logLevel, String message)
424     {
425         this.log(source, logLevel, message, null);
426     }
427     
428     protected void log(Object source, int logLevel, String message, Throwable e)
429     {
430         if (this.getLogLevel() > logLevel)
431             return;
432         
433         switch (logLevel)
434         {
435             case DEBUG:
436             case INFO:
437                 if (e != null)
438                     Log.print(source, message, e);
439                 else
440                     Log.print(source, message);
441                 break;
442             case WARNING:
443                 if (e != null)
444                     Log.printWarning(source, message, e);
445                 else
446                     Log.printWarning(source, message);
447                 break;
448             case ERROR:
449                 if (e != null)
450                     Log.printError(source, message, e);
451                 else
452                     Log.printError(source, message);
453                 break;
454         }
455     }
456     
457     protected void fireTunnelClosedEvent()
458     {
459         TunnelEvent event = new TunnelEvent(this, TunnelEvent.TUNNEL_CLOSED);
460         for (int i = 0; i < mTunnelListeners.size(); i++)
461             ((ITunnelListener) mTunnelListeners.get(i)).tunnelClosed(event);
462     }
463     
464     protected void fireConnectionLostEvent()
465     {
466         TunnelEvent event = new TunnelEvent(this, TunnelEvent.CONNECTION_LOST);
467         for (int i = 0; i < mTunnelListeners.size(); i++)
468             ((ITunnelListener) mTunnelListeners.get(i)).connectionLost(event);
469     }
470     
471     protected synchronized int getNextPackageID()
472     {
473         if (mCurrentPackageID < Integer.MAX_VALUE)
474             mCurrentPackageID++;
475         else
476             mCurrentPackageID = 1;
477         
478         return mCurrentPackageID;
479     }
480     
481     protected void addToOutQueue(IPackage outPackage)
482     {
483         synchronized (mOutQueue)
484         {
485             this.log(this, AbstractTunnel.DEBUG, "Added package to out queue: " + outPackage.toString());
486             mOutQueue.add(outPackage);
487             mOutQueue.notifyAll();
488         }
489     }
490     
491     protected IPackage getNextFromOutQueue()
492     {
493         synchronized (mOutQueue)
494         {
495             while (mOutQueue.isEmpty() && mIsInitialized)
496                 try
497                 {
498                     mOutQueue.wait();
499                 }
500                 catch (InterruptedException ie) {}
501             
502             if (!mIsInitialized)
503                 return null;
504             
505             IPackage thePackage = (IPackage) mOutQueue.remove(0);
506             this.log(this, AbstractTunnel.DEBUG, "Removed package from out queue: " + thePackage.toString());
507             mOutQueue.notifyAll();
508             return thePackage;
509         }
510     }
511     
512     protected void addToInQueue(IPackage inPackage)
513     {
514         synchronized (mInQueue)
515         {
516             this.log(this, AbstractTunnel.DEBUG, "Added package to in queue: " + inPackage.toString());
517             mInQueue.add(inPackage);
518             mInQueue.notifyAll();
519         }
520     }
521     
522     protected IPackage getNextFromInQueue()
523     {
524         synchronized (mInQueue)
525         {
526             while (mInQueue.isEmpty() && mIsInitialized)
527                 try
528                 {
529                     mInQueue.wait();
530                 }
531                 catch (InterruptedException ie) {}
532             
533             if (!mIsInitialized)
534                 return null;
535             
536             this.log(this, DEBUG, "Removed package from in queue: " + mInQueue.get(0).toString());
537             return (IPackage) mInQueue.remove(0);
538         }
539     }
540     
541     protected Object waitForReply(IPackage requestPackage)
542     {
543         Integer packageID = new Integer(requestPackage.getID());
544         
545         synchronized (mReplyMessages)
546         {
547             while (!mReplyMessages.containsKey(packageID))
548                 try
549                 {
550                     mReplyMessages.wait();
551                 }
552                 catch (InterruptedException ie) {}
553 
554             return mReplyMessages.remove(packageID);
555         }
556     }
557     
558     protected void doOnTunnelClose()
559     {
560         // Check to avoid multiple events
561         if (mIsInitialized)
562         {
563             this.fireTunnelClosedEvent();
564             this.finalizeTunnel();
565         }
566     }
567     
568     protected void doOnConnectionLost()
569     {
570         // Check to avoid multiple events
571         if (mIsInitialized)
572         {
573             this.fireConnectionLostEvent();
574             this.finalizeTunnel();
575         }
576     }
577     
578     protected void finalizeTunnel()
579     {
580         try
581         {
582             if (mIsInitialized)
583                 this.finalize();
584         }
585         catch (Exception e)
586         {
587             this.log(this, AbstractTunnel.ERROR, "Couldn't finalize tunnel", e);
588         }
589     }
590     
591     private int getSessionID(Object message)
592     {
593         int sessionID = -1;
594         if (!(message instanceof IService) &&
595             !(message instanceof ServiceProvider.IServiceProviderMessage) &&
596             !(message instanceof ILoginService.LoginMessage) &&
597             !(message instanceof ILoginService.LoginServiceReturnMessage))
598             sessionID = UserInfo.getCurrentUserInfo().getSessionID();
599             
600         return sessionID;
601     }
602     
603     // Nested classes ----------------------------------------------------------
604     
605     /*** An IPackage represents a package that is sent by an ITunnel. It encapsulates
606      * the actual message and provides extra information needed by the ITunnel.
607      */
608     public static interface IPackage extends Serializable
609     {
610         // Constants -----------------------------------------------------------
611         public static final int PING = 0;
612         public static final int ERROR = 1;
613         public static final int INFO = 2;
614         public static final int REQUEST = 3;
615         public static final int REPLY = 4;
616         public static final int DATA = 5;
617         //public static final int CLOSE = 6;
618         
619         // Methods -------------------------------------------------------------
620         
621         /***
622          * Returns the unique package ID for this package. The ID must be unique
623          * within the ITunnels that are used to send this package. IDs must be
624          * positive integers.
625          */
626         public int getID();
627         
628         /***
629          * If this IPackage is the reply to a request message, the ID of the request
630          * IPackage is returned. Otherwise -1 is returned.
631          */
632         public int getOriginID();
633         
634         /***
635          * Sets the origin id of this IPackage.
636          */
637         public void setOriginID(int originID);
638         
639         /***
640          * Returns the type of this IPackage.
641          */
642         public int getType();
643         
644         /***
645          * Sets the type of this IPackage.
646          */
647         public void setType(int type);
648         
649         /***
650          * Returns the encapsulated message.
651          */
652         public Object getMessage();
653         
654         /*** Returns the session id that should be used by the server to find
655          * the session to use.
656          */
657         public int getSessionID();
658     }
659     
660     public static class DefaultPackage implements IPackage
661     {
662         // Data members --------------------------------------------------------
663         protected int mID;
664         protected int mOriginID;
665         protected int mType;
666         protected Object mMessage;
667         protected int mSessionID;
668         
669         // Constructors --------------------------------------------------------
670         public DefaultPackage(int ID, Object message, int sessionID)
671         {
672             this(ID, -1, -1, message, sessionID);
673         }
674         
675         public DefaultPackage(int ID, int originID, int type, Object message, int sessionID)
676         {
677             mID = ID;
678             mOriginID = originID;
679             mType = type;
680             mMessage = message;
681             mSessionID = sessionID;
682         }
683         
684         // Superclass overrides ------------------------------------------------
685         public String toString()
686         {
687             StringBuffer stringBuffer = new StringBuffer();
688             stringBuffer.append('[');
689             stringBuffer.append(this.getID());
690             stringBuffer.append(' ');
691             stringBuffer.append(this.getOriginID());
692             stringBuffer.append(' ');
693             stringBuffer.append(this.getType());
694             stringBuffer.append("] ");
695             stringBuffer.append(this.getMessage());
696             
697             return stringBuffer.toString();
698         }
699         
700         // IPackage implementation ---------------------------------------------
701         
702         /***
703          * Returns the unique package ID for this package. The ID must be unique
704          * within the ITunnels that are used to send this package. IDs must be
705          * positive integers.
706          */
707         public int getID()
708         {
709             return mID;
710         }
711         
712         /***
713          * If this IPackage is the reply to a request message, the ID of the request
714          * IPackage is returned. Otherwise -1 is returned.
715          */
716         public int getOriginID()
717         {
718             return mOriginID;
719         }
720         
721         /***
722          * Sets the origin id of this IPackage.
723          */
724         public void setOriginID(int originID)
725         {
726             mOriginID = originID;
727         }
728         
729         /***
730          * Returns the type of this IPackage.
731          */
732         public int getType()
733         {
734             return mType;
735         }
736         
737         /***
738          * Sets the type of this IPackage.
739          */
740         public void setType(int type)
741         {
742             mType = type;
743         }
744         
745         /***
746          * Returns the encapsulated message.
747          */
748         public Object getMessage()
749         {
750             return mMessage;
751         }
752         
753         /*** Returns the session id that should be used by the server to find
754          * the session to use.
755          */
756         public int getSessionID()
757         {
758             return mSessionID;
759         }
760     }
761     
762     public static interface IFinalizeMessage extends Serializable
763     {
764     }
765     
766     public static interface IConnectionMessage extends Serializable
767     {
768         // Methods -------------------------------------------------------------
769         public String getCodecClassName();
770         
771         public String[] getPackerClassNames();
772     }
773     
774     public static interface IStatusMessage extends Serializable
775     {
776         // Constants -----------------------------------------------------------
777         public static final int OK = 0;
778         public static final int NOT_OK = 1;
779         
780         // Methods -------------------------------------------------------------
781         public int getStatus();
782         
783         public String getMessage();
784     }
785     
786     public static class DefaultStatusMessage implements IStatusMessage
787     {
788         // Data members --------------------------------------------------------
789         protected int mStatus;
790         protected String mMessage;
791         
792         // Constructors --------------------------------------------------------
793         public DefaultStatusMessage(int status, String message)
794         {
795             mStatus = status;
796             mMessage = message;
797         }
798         
799         // Superclass overrides ------------------------------------------------
800         public String toString()
801         {
802             StringBuffer stringBuffer = new StringBuffer();
803             stringBuffer.append('[');
804             switch (mStatus)
805             {
806                 case IStatusMessage.OK:
807                     stringBuffer.append("OK");
808                     break;
809                 case IStatusMessage.NOT_OK:
810                     stringBuffer.append("NOT_OK");
811                     break;
812             }
813             stringBuffer.append("] ");
814             stringBuffer.append(mMessage);
815             return stringBuffer.toString();
816         }
817         
818         // IStatusMessage implementation ---------------------------------------
819         public int getStatus()
820         {
821             return mStatus;
822         }
823         
824         public String getMessage()
825         {
826             return mMessage;
827         }
828     }
829     
830     public static final class DefaultConnectionMessage implements IConnectionMessage
831     {
832         // Data members --------------------------------------------------------
833         protected String mCodecClassName;
834         protected String[] mPackerClassNames;
835         
836         // Constructors --------------------------------------------------------
837         public DefaultConnectionMessage(String codecClassName)
838         {
839             this(codecClassName, new String[0]);
840         }
841         
842         public DefaultConnectionMessage(String codecClassName, String[] packerClassNames)
843         {
844             mCodecClassName = codecClassName;
845             mPackerClassNames = packerClassNames;
846         }
847         
848         // Superclass overrides ------------------------------------------------
849         public String toString()
850         {
851             StringBuffer buffer = new StringBuffer();
852             buffer.append("Codecs: ");
853             buffer.append(mCodecClassName);
854             buffer.append('\n');
855             buffer.append("Packers: ");
856             for (int i = 0; i < mPackerClassNames.length; i++)
857             {
858                 if (i > 0)
859                     buffer.append(", ");
860                 buffer.append(mPackerClassNames[i]);
861             }
862             
863             return buffer.toString();
864         }
865         
866         // IConnectionMessage implementation -----------------------------------
867         public String getCodecClassName()
868         {
869             return mCodecClassName;
870         }
871         
872         public String[] getPackerClassNames()
873         {
874             return mPackerClassNames;
875         }
876     }
877     
878     public static final class ErrorMessage implements Serializable
879     {
880         // Data members --------------------------------------------------------
881         protected String mErrorMessage;
882         protected Throwable mThrowable;
883         
884         // Constructors --------------------------------------------------------
885         public ErrorMessage(String errorMessage, Throwable throwable)
886         {
887             mErrorMessage = errorMessage;
888             mThrowable = throwable;
889         }
890         
891         // Superclass overrides ------------------------------------------------
892         public String toString()
893         {
894             return mErrorMessage;
895         }
896     }
897     
898     protected class InputThread extends Thread
899     {
900         // Constants -----------------------------------------------------------
901         public static final int INITIALIZATION_RETRY_TIME = 1;
902         
903         // Data members --------------------------------------------------------
904         protected boolean mIsStopped;
905         protected int mNumerOfExceptions;
906         protected InputStream mInputStream;
907         
908         // Constructors --------------------------------------------------------
909         public InputThread()
910         {
911             super("Server Input Thread");
912         }
913         
914         // Superclass overrides ------------------------------------------------
915         public void run()
916         {
917             log(this, DEBUG, "InputThread started");
918             
919             while (!mIsStopped)
920             {
921                 try
922                 {
923                     if (this.initializeInputStream())
924                     {
925                         IPackage inPackage = (IPackage) mCodec.decode(mInputStream);
926                             
927                         if (mIsStopped)
928                             break;
929                         
930                         if (inPackage != null)
931                         {
932                             log(this, DEBUG, "Input received on in stream");
933                             addToInQueue(inPackage);
934                         }
935                         else
936                             log(this, WARNING, "Codec returned null");
937                     }
938                     else
939                     {
940                         log(this, ERROR, "Coludn't initialize input stream, retrying in " + INITIALIZATION_RETRY_TIME + " seconds");
941                         sleep(INITIALIZATION_RETRY_TIME * 1000);
942                     }
943                 }
944                 catch (InterruptedException ie)
945                 {
946                     break;
947                 }
948                 catch (EOFException eofe)
949                 {
950                     if (!mIsStopped)
951                     {
952                         log(this, DEBUG, "Closing tunnel");
953                         doOnTunnelClose();
954                     }
955                 }
956                 catch (IOException ioe)
957                 {
958                     if (!mIsStopped)
959                     {
960                         log(this, ERROR, "An error occured", ioe);
961                         doOnConnectionLost();
962                     }
963                 }
964             }
965             
966             try
967             {
968                 if (mInputStream != null)
969                     mInputStream.close();
970             }
971             catch (IOException ioe)
972             {
973                 Log.print(this, "Couldn't close input stream", ioe);
974             }
975             log(this, DEBUG, "InputThread stopped");
976         }
977         
978         // Action methods ------------------------------------------------------
979         public void stopThread()
980         {
981             mIsStopped = true;
982             try
983             {
984                 if (mInputStream != null)
985                 {
986                     mInputStream.close();
987                     mInputStream = null;
988                 }
989             }
990             catch (IOException ioe)
991             {
992                 // This is ok. We try to close the input stream only to ensure
993                 // that the input thread does not hang on the input stream.
994             }
995         }
996         
997         // Help methods --------------------------------------------------------
998         protected boolean initializeInputStream()
999         {
1000             if (mInputStream != null)
1001                 return true;
1002             
1003             InputStream inputStream = getInputStream();
1004             if (inputStream == null)
1005                 return false;
1006             for (int i = 0; i < mPackers.size(); i++)
1007                 inputStream = ((ITunnelPacker) mPackers.get(i)).createInputStream(inputStream);
1008             
1009             mInputStream = inputStream;
1010             return true;
1011         }
1012     }
1013     
1014     protected class OutputThread extends Thread
1015     {
1016         // Constants -----------------------------------------------------------
1017         public static final int INITIALIZATION_RETRY_TIME = 1;
1018         
1019         // Data members --------------------------------------------------------
1020         protected boolean mIsStopped;
1021         protected int mNumerOfExceptions;
1022         protected OutputStream mOutputStream;
1023         
1024         // Constructors --------------------------------------------------------
1025         public OutputThread()
1026         {
1027             super("Server Output Thread");
1028         }
1029         
1030         // Superclass overrides ------------------------------------------------
1031         public void run()
1032         {
1033             log(this, DEBUG, "OutputThread started");
1034             
1035             while (!mIsStopped)
1036             {
1037                 try
1038                 {
1039                     if (this.initializeOutputStream())
1040                     {
1041                         IPackage outPackage = getNextFromOutQueue();
1042                         if (outPackage == null)
1043                             break;
1044                     
1045                         log(this,  DEBUG, "New package taken from out queue: " + outPackage.toString());
1046                         mCodec.encode(outPackage, mOutputStream);
1047                         log(this, DEBUG, "Package encoded and sent");
1048                     }
1049                     else
1050                     {
1051                         log(this, ERROR, "Coludn't initialize output stream, retrying in " + INITIALIZATION_RETRY_TIME + " seconds");
1052                         sleep(INITIALIZATION_RETRY_TIME * 1000);
1053                     }
1054                 }
1055                 catch (InterruptedException ie)
1056                 {
1057                     break;
1058                 }
1059                 catch (EOFException eofe)
1060                 {
1061                     if (!mIsStopped)
1062                     {
1063                         log(this, DEBUG, "Closing tunnel", eofe);
1064                         doOnTunnelClose();
1065                     }
1066                 }
1067                 catch (IOException ioe)
1068                 {
1069                     log(this, ERROR, "An error occured", ioe);
1070                     doOnConnectionLost();
1071                 }
1072             }
1073             
1074             try
1075             {
1076                 if (mOutputStream != null)
1077                     mOutputStream.close();
1078             }
1079             catch (IOException ioe)
1080             {
1081                 Log.print(this, "Couldn't close output stream", ioe);
1082             }
1083             log(this, DEBUG, "OutputThread stopped");
1084         }
1085         
1086         // Action methods ------------------------------------------------------
1087         public void stopThread()
1088         {
1089             mIsStopped = true;
1090         }
1091         
1092         // Help methods --------------------------------------------------------
1093         protected boolean initializeOutputStream()
1094         {
1095             if (mOutputStream != null)
1096                 return true;
1097             
1098             OutputStream outputStream = getOutputStream();
1099             if (outputStream == null)
1100                 return false;
1101             for (int i = 0; i < mPackers.size(); i++)
1102                 outputStream = ((ITunnelPacker) mPackers.get(i)).createOutputStream(outputStream);
1103             
1104             mOutputStream = outputStream;
1105             return true;
1106         }
1107     }
1108     
1109     protected class WorkerThread extends Thread
1110     {
1111         // Data members --------------------------------------------------------
1112         private boolean mIsStopped;
1113         
1114         // Constructors --------------------------------------------------------
1115         public WorkerThread()
1116         {
1117             super("Server Worker Thread");
1118         }
1119         
1120         // Superclass overrides ------------------------------------------------
1121         public void run()
1122         {
1123             log(this, DEBUG, "Worker thread started");
1124             
1125             while (!mIsStopped)
1126                 this.receiveInput(getNextFromInQueue());
1127 
1128             log(this, DEBUG, "Worker thread stopped");
1129         }
1130         
1131         // Action methods ------------------------------------------------------
1132         protected void stopWorkerThread()
1133         {
1134             mIsStopped = true;
1135         }
1136         
1137         // Help methods --------------------------------------------------------
1138         protected void receiveInput(IPackage inPackage)
1139         {
1140             if (inPackage == null)
1141                 return;
1142                 
1143             try
1144             {
1145 //                if ((inPackage.getType() == IPackage.DATA ||
1146 //                    inPackage.getType() == IPackage.REQUEST ||
1147 //                    inPackage.getType() == IPackage.PING) &&
1148                 if (!(inPackage.getMessage() instanceof IService) &&
1149                     !(inPackage.getMessage() instanceof ServiceProvider.IServiceProviderMessage) &&
1150                     !(inPackage.getMessage() instanceof ILoginService.LoginMessage) &&
1151                     !(inPackage.getMessage() instanceof ILoginService.LoginServiceReturnMessage))
1152                 {
1153                     ISession session = SessionHandler.getInstance().getSession(inPackage.getSessionID());
1154                     if (session != null)
1155                     {
1156                         UserInfo.setCurrentUserInfo(session.getUserInfo());
1157                     }
1158                     else
1159                     {
1160                         try
1161                         {
1162                             UserInfo.getCurrentUserInfo();
1163                         }
1164                         catch (Throwable t)
1165                         {
1166                             throw new SecurityException("No valid session found");
1167                         }
1168                     }
1169                 }
1170                     
1171                 
1172                 switch (inPackage.getType())
1173                 {
1174                     case IPackage.DATA:
1175                         this.handleDataPackage(inPackage);
1176                         break;
1177                     case IPackage.REQUEST:
1178                         this.handleRequestPackage(inPackage);
1179                         break;
1180                     case IPackage.REPLY:
1181                         this.handleReplyPackage(inPackage);
1182                         break;
1183                     case IPackage.INFO:
1184                         this.handleInfoPackage(inPackage);
1185                         break;
1186                     case IPackage.ERROR:
1187                         this.handleErrorPackage(inPackage);
1188                         break;
1189                     case IPackage.PING:
1190                         this.handlePingPackage(inPackage);
1191                         break;
1192                     default:
1193                         log(this, DEBUG, "Received unknown package");
1194                 }
1195             }
1196             catch (Throwable t)
1197             {
1198                 IPackage errorPackage = createPackage(new ErrorMessage("Error while processing request", t), -1);
1199                 errorPackage.setOriginID(inPackage.getID());
1200                 errorPackage.setType(IPackage.ERROR);
1201                 addToOutQueue(errorPackage);
1202             }
1203             finally
1204             {
1205                 UserInfo.setCurrentUserInfo(null);
1206             }
1207         }
1208 
1209         protected synchronized void handleDataPackage(IPackage inPackage)
1210         {
1211             log(this, DEBUG, "Received DATA package");
1212             this.consumePackage(inPackage);
1213         }
1214 
1215         protected synchronized void handleRequestPackage(IPackage inPackage)
1216         {
1217             log(this, DEBUG, "Received REQUEST package");
1218 
1219             for (int i = 0; i < mMessageConsumers.size(); i++)
1220                 if (((IMessageConsumer) mMessageConsumers.get(i)).acceptsMessage(inPackage.getMessage()))
1221                 {
1222                     Object replyMessage = ((IMessageConsumer) mMessageConsumers.get(i)).answerMessage(inPackage.getMessage());
1223                     IPackage replyPackage = createPackage(replyMessage, getSessionID(replyMessage));
1224                     replyPackage.setOriginID(inPackage.getID());
1225                     replyPackage.setType(IPackage.REPLY);
1226                     addToOutQueue(replyPackage);
1227                     /*RequestHandler requestHandler = new RequestHandler((IMessageConsumer) mMessageConsumers.get(i), inPackage);
1228                     requestHandler.start();*/
1229                     return;
1230                 }
1231 
1232             log(this, WARNING, "Package not accepted: " + inPackage.toString());
1233         }
1234 
1235         protected synchronized void handleReplyPackage(IPackage inPackage)
1236         {
1237             log(this, DEBUG, "Received REPLY package");
1238 
1239             synchronized (mReplyMessages)
1240             {
1241                 mReplyMessages.put(new Integer(inPackage.getOriginID()), inPackage.getMessage());
1242                 mReplyMessages.notifyAll();
1243             }
1244         }
1245 
1246         protected synchronized void handleInfoPackage(IPackage inPackage)
1247         {
1248             log(this, DEBUG, "Received INFO package");
1249 
1250             for (int i = 0; i < mStatusCallbacks.size(); i++)
1251                 ((IStatusCallback) mStatusCallbacks.get(i)).handleInfo(inPackage.getMessage().toString());
1252 
1253             log(this, INFO, "INFO: " + inPackage.getMessage().toString());
1254         }
1255 
1256         protected synchronized void handleErrorPackage(IPackage inPackage)
1257         {
1258             log(this, DEBUG, "Received ERROR package");
1259 
1260             for (int i = 0; i < mStatusCallbacks.size(); i++)
1261             {
1262                 if (inPackage.getMessage() instanceof ErrorMessage)
1263                 {
1264                     ErrorMessage errorMsg = (ErrorMessage) inPackage.getMessage();
1265                     ((IStatusCallback) mStatusCallbacks.get(i)).handleError(errorMsg.mErrorMessage, errorMsg.mThrowable);
1266                 }
1267                 else
1268                     ((IStatusCallback) mStatusCallbacks.get(i)).handleError(inPackage.getMessage().toString(), null);
1269             }
1270 
1271             if (inPackage.getMessage() instanceof ErrorMessage)
1272                 {
1273                     ErrorMessage errorMsg = (ErrorMessage) inPackage.getMessage();
1274                     log(this, ERROR, errorMsg.mErrorMessage, errorMsg.mThrowable);
1275                 }
1276                 else
1277                     log(this, ERROR, inPackage.getMessage().toString());
1278                     
1279             if (inPackage.getOriginID() != -1)
1280             {
1281                 synchronized (mReplyMessages)
1282                 {
1283                     mReplyMessages.put(new Integer(inPackage.getOriginID()), inPackage.getMessage());
1284                     mReplyMessages.notifyAll();
1285                 }
1286             }
1287         }
1288 
1289         protected synchronized void handlePingPackage(IPackage inPackage)
1290         {
1291             log(this, DEBUG, "Received PING package");
1292             IPackage replyPackage = createPackage("", getSessionID(""));
1293             replyPackage.setOriginID(inPackage.getID());
1294             replyPackage.setType(IPackage.REPLY);
1295             addToOutQueue(replyPackage);
1296         }
1297 
1298         protected synchronized void consumePackage(IPackage inPackage)
1299         {
1300             for (int i = 0; i < mMessageConsumers.size(); i++)
1301                 if (((IMessageConsumer) mMessageConsumers.get(i)).acceptsMessage(inPackage.getMessage()))
1302                 {
1303                     ((IMessageConsumer) mMessageConsumers.get(i)).consumeMessage(inPackage.getMessage());
1304                     return;
1305                 }
1306 
1307             log(this, WARNING, "Package not accepted: " + inPackage.toString());
1308         }
1309     }
1310 }