1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.caleigo.toolkit.tunnel;
20
21
22 import java.io.*;
23 import java.util.*;
24
25 import org.caleigo.security.*;
26 import org.caleigo.service.*;
27 import org.caleigo.toolkit.log.Log;
28
29 /*** Abstract class that implements most of the functinallity needed by an ITunnel.
30 * Only to methods need to be implemented by subclsses: getOutStream and getInStream.
31 * Optionally the method createPackage can be overridden to provide custom implementations
32 * of the IPackage interface.
33 *
34 * @author Mattias Hagstrand
35 * @version 1.00
36 *
37 *//*
38 *
39 * WHEN WHO WHY & WHAT
40 * -----------------------------------------------------------------------------
41 * 2002-07-01 Mattias Hagstrand Creation
42 */
43 public abstract class AbstractTunnel implements ITunnel
44 {
45
46 public static final int DEBUG = 0;
47 public static final int INFO = 1;
48 public static final int WARNING = 2;
49 public static final int ERROR = 3;
50
51 public static final int DEFAULT_NBR_OF_WORKER_THREADS = 5;
52
53
54 protected static int sNbrOfWorkerThreads;
55
56
57 protected List mMessageConsumers;
58 protected ITunnelCodec mCodec;
59 protected List mPackers;
60 protected List mStatusCallbacks;
61 protected int mLogLevel;
62 protected List mOutQueue;
63 protected List mInQueue;
64 protected List mTunnelListeners;
65
66 private boolean mIsInitialized;
67 protected int mCurrentPackageID;
68 protected WorkerThread[] mWorkerThreads;
69 protected OutputThread mOutputThread;
70 protected InputThread mInputThread;
71 protected Map mReplyMessages;
72
73
74 static
75 {
76 sNbrOfWorkerThreads = DEFAULT_NBR_OF_WORKER_THREADS;
77 }
78
79 /*** Sets the number of worker threads for all tunnels that are initilized
80 * after this methods has been called. Calling this method has no effect on
81 * tunnels that have allready been initilized.
82 */
83 public static void setNbrOfWorkerThreads(int nbrOfWorkerThreads)
84 {
85 if (nbrOfWorkerThreads < 1)
86 throw new IllegalArgumentException("nbrOfWorkerThreads < 1");
87
88 sNbrOfWorkerThreads = nbrOfWorkerThreads;
89 }
90
91
92
93 /***
94 * Default constructor. Uses a SerializeCodec as codec.
95 */
96 public AbstractTunnel()
97 {
98 this(new SerializeCodec());
99 }
100
101 public AbstractTunnel(ITunnelCodec codec)
102 {
103 mCodec = codec;
104
105 mMessageConsumers = new ArrayList();
106 mPackers = new ArrayList();
107 mStatusCallbacks = new ArrayList();
108 mOutQueue = new ArrayList();
109 mInQueue = new ArrayList();
110 mReplyMessages = new HashMap();
111 mTunnelListeners = new ArrayList();
112 mLogLevel = AbstractTunnel.WARNING;
113 }
114
115
116
117 /***
118 * Initializes the ITunnel. This method must be called before any messages
119 * can be sent. After this method has been called it is not possible to
120 * set the ITunnelCodec or to add or remove ITunnelPackers. Subclasses of
121 * AbstractTunnel must call super.initialize(). Successful initialization
122 * recuires that the methods getInStream and getOutStream does not return
123 * <code>null</code>.
124 */
125 public synchronized void initialize()
126 throws IllegalStateException, TunnelException
127 {
128 if (mIsInitialized)
129 throw new IllegalStateException("The tunnel is initialized");
130
131 mIsInitialized = true;
132
133 mOutputThread = new OutputThread();
134 mOutputThread.start();
135 mInputThread = new InputThread();
136 mInputThread.start();
137
138
139 mWorkerThreads = new WorkerThread[sNbrOfWorkerThreads];
140 for (int i = 0; i < mWorkerThreads.length; i++)
141 {
142 mWorkerThreads[i] = new WorkerThread();
143 mWorkerThreads[i].start();
144 }
145 }
146
147 /***
148 * Performs cleanup for this ITunnel. No messages can be sent after this
149 * method has been called. This method will send all messages in the message
150 * queue before closing down. Subclasses of AbstractTunnel must call super.finalize()
151 * before they do any finalization of their own.
152 */
153 public synchronized void finalize()
154 throws IllegalStateException, TunnelException
155 {
156 if (!mIsInitialized)
157 throw new IllegalStateException("The tunnel is not initialized");
158
159
160
161
162 mInputThread.stopThread();
163
164
165
166
167
168
169
170
171
172
173 synchronized (mOutQueue)
174 {
175
176 Iterator packages = mOutQueue.iterator();
177 while (packages.hasNext())
178 {
179 IPackage currentPackage = (IPackage) packages.next();
180 if (!(currentPackage.getMessage() instanceof IFinalizeMessage))
181 packages.remove();
182 }
183
184
185 try
186 {
187 while (mOutQueue.size() > 0)
188 mOutQueue.wait();
189 }
190 catch (InterruptedException ie)
191 {
192 Log.printError(this, "Interrupted while waiting for finalize message to be sent", ie);
193 }
194 }
195
196 mIsInitialized = false;
197
198
199 for (int i = 0; i < mWorkerThreads.length; i++)
200 mWorkerThreads[i].stopWorkerThread();
201
202 synchronized (mInQueue)
203 {
204 mInQueue.notifyAll();
205 }
206
207
208 mOutputThread.stopThread();
209
210 synchronized (mOutQueue)
211 {
212 mOutQueue.notifyAll();
213 }
214
215 this.fireTunnelClosedEvent();
216 }
217
218 /*** Returns <code>true</code> if this tunnel is active, that is, if it is
219 * correctly initialized. If this method returns <code>true</code> then
220 * a call to finalize must not throw an IllegalStateException.
221 */
222 public synchronized boolean isActive()
223 {
224 return mIsInitialized;
225 }
226
227 /***
228 * Sends a ping message over this ITunnel. This method is typically used to
229 * check that the communication is working.
230 *
231 * @return the ping time in milli seconds.
232 */
233 public int ping()
234 throws TunnelException
235 {
236 if (!mIsInitialized)
237 throw new TunnelException("The tunnel is not initialized");
238
239
240 IPackage pingPackage = this.createPackage("", this.getSessionID(""));
241 pingPackage.setType(IPackage.PING);
242
243
244 long startTime = System.currentTimeMillis();
245 this.addToOutQueue(pingPackage);
246 this.waitForReply(pingPackage);
247
248
249 return (int) (System.currentTimeMillis() - startTime);
250 }
251
252 /***
253 * Sends a message over this ITunnel. This method returns after the message
254 * has been sent.
255 */
256 public void send(Object message)
257 throws TunnelException
258 {
259 if (!mIsInitialized)
260 throw new TunnelException("The tunnel is not initialized");
261
262 IPackage outPackage = this.createPackage(message, this.getSessionID(message));
263 outPackage.setType(IPackage.DATA);
264 this.addToOutQueue(outPackage);
265 }
266
267 /***
268 * Sends a request/reply message over this ITunnel. This method blocks until
269 * the reply is received.
270 */
271 public Object ask(Object message)
272 throws TunnelException
273 {
274 if (!mIsInitialized)
275 throw new TunnelException("The tunnel is not initialized");
276
277 IPackage outPackage = this.createPackage(message, this.getSessionID(message));
278 outPackage.setType(IPackage.REQUEST);
279 this.addToOutQueue(outPackage);
280 return this.waitForReply(outPackage);
281 }
282
283 /***
284 * Adds the consumer to the end of this ITunnels message consumer list.
285 */
286 public synchronized void addMessageConsumer(IMessageConsumer consumer)
287 {
288 if (!mMessageConsumers.contains(consumer))
289 mMessageConsumers.add(consumer);
290 }
291
292 /***
293 * Removes the consumer from this ITunnels message consumer list.
294 */
295 public synchronized void remomveMessageConsumer(IMessageConsumer consumer)
296 {
297 mMessageConsumers.remove(consumer);
298 }
299
300 /***
301 * Sets the ITunnelCodec that should be used by this ITunnel.
302 */
303 public synchronized void setCodec(ITunnelCodec codec)
304 throws IllegalStateException
305 {
306 if (mIsInitialized)
307 throw new IllegalStateException("The tunnel is initialized");
308
309 mCodec = codec;
310 }
311
312 /***
313 * Adds the packer to the end of this ITunnels packer list.
314 */
315 public synchronized void addPacker(ITunnelPacker packer)
316 throws IllegalStateException
317 {
318 if (mIsInitialized)
319 throw new IllegalStateException("The tunnel is initialized");
320
321 mPackers.add(packer);
322 }
323
324 /***
325 * Removes the packer from this ITunnels packer list.
326 */
327 public synchronized void remomvePacker(ITunnelPacker packer)
328 throws IllegalStateException
329 {
330 if (mIsInitialized)
331 throw new IllegalStateException("The tunnel is initialized");
332
333 mPackers.remove(packer);
334 }
335
336 /***
337 * Adds the callback to the list of IStatusCallbacks that receives status
338 * messages from this ITunnel.
339 */
340 public synchronized void addStatusCallback(IStatusCallback callback)
341 {
342 if (!mStatusCallbacks.contains(callback))
343 mStatusCallbacks.add(callback);
344 }
345
346 /***
347 * Removes the callback from the list of IStatusCallbacks.
348 */
349 public synchronized void removeStatusCallback(IStatusCallback callback)
350 {
351 mStatusCallbacks.remove(callback);
352 }
353
354 /***
355 * Sets the log level for this ITunnel.
356 */
357 public void setLogLevel(int level)
358 {
359 mLogLevel = level;
360 }
361
362 /***
363 * Gets the log level for this ITunnel.
364 */
365 public int getLogLevel()
366 {
367 return mLogLevel;
368 }
369
370 public void addTunnelListener(ITunnelListener listener)
371 {
372 if (!mTunnelListeners.contains(listener))
373 mTunnelListeners.add(listener);
374 }
375
376 public void removeTunnelListener(ITunnelListener listener)
377 {
378 mTunnelListeners.remove(listener);
379 }
380
381 /*** Returns a humanly readably text that describes the ITunnel.
382 */
383 public String getDescription()
384 {
385 return "[no description avilable]";
386 }
387
388
389
390 /***
391 * Creates an IPackage that contains the provided message.
392 */
393 public IPackage createPackage(Object message, int sessionID)
394 {
395 return new DefaultPackage(this.getNextPackageID(), message, sessionID);
396 }
397
398
399
400 /***
401 * Returns an OutputStream that should be used to send messages.
402 */
403 protected abstract OutputStream getOutputStream();
404
405 /***
406 * Returns an InputStream that should be used to receive messages.
407 */
408 protected abstract InputStream getInputStream();
409
410
411 protected IConnectionMessage createConnectionMessage()
412 {
413
414 String codecClassName = mCodec.getClass().getName();
415 String[] packerClassNames = new String[mPackers.size()];
416 for (int i = 0; i < packerClassNames.length; i++)
417 packerClassNames[i] = mPackers.get(i).getClass().getName();
418
419 return new DefaultConnectionMessage(codecClassName, packerClassNames);
420 }
421
422
423 protected void log(Object source, int logLevel, String message)
424 {
425 this.log(source, logLevel, message, null);
426 }
427
428 protected void log(Object source, int logLevel, String message, Throwable e)
429 {
430 if (this.getLogLevel() > logLevel)
431 return;
432
433 switch (logLevel)
434 {
435 case DEBUG:
436 case INFO:
437 if (e != null)
438 Log.print(source, message, e);
439 else
440 Log.print(source, message);
441 break;
442 case WARNING:
443 if (e != null)
444 Log.printWarning(source, message, e);
445 else
446 Log.printWarning(source, message);
447 break;
448 case ERROR:
449 if (e != null)
450 Log.printError(source, message, e);
451 else
452 Log.printError(source, message);
453 break;
454 }
455 }
456
457 protected void fireTunnelClosedEvent()
458 {
459 TunnelEvent event = new TunnelEvent(this, TunnelEvent.TUNNEL_CLOSED);
460 for (int i = 0; i < mTunnelListeners.size(); i++)
461 ((ITunnelListener) mTunnelListeners.get(i)).tunnelClosed(event);
462 }
463
464 protected void fireConnectionLostEvent()
465 {
466 TunnelEvent event = new TunnelEvent(this, TunnelEvent.CONNECTION_LOST);
467 for (int i = 0; i < mTunnelListeners.size(); i++)
468 ((ITunnelListener) mTunnelListeners.get(i)).connectionLost(event);
469 }
470
471 protected synchronized int getNextPackageID()
472 {
473 if (mCurrentPackageID < Integer.MAX_VALUE)
474 mCurrentPackageID++;
475 else
476 mCurrentPackageID = 1;
477
478 return mCurrentPackageID;
479 }
480
481 protected void addToOutQueue(IPackage outPackage)
482 {
483 synchronized (mOutQueue)
484 {
485 this.log(this, AbstractTunnel.DEBUG, "Added package to out queue: " + outPackage.toString());
486 mOutQueue.add(outPackage);
487 mOutQueue.notifyAll();
488 }
489 }
490
491 protected IPackage getNextFromOutQueue()
492 {
493 synchronized (mOutQueue)
494 {
495 while (mOutQueue.isEmpty() && mIsInitialized)
496 try
497 {
498 mOutQueue.wait();
499 }
500 catch (InterruptedException ie) {}
501
502 if (!mIsInitialized)
503 return null;
504
505 IPackage thePackage = (IPackage) mOutQueue.remove(0);
506 this.log(this, AbstractTunnel.DEBUG, "Removed package from out queue: " + thePackage.toString());
507 mOutQueue.notifyAll();
508 return thePackage;
509 }
510 }
511
512 protected void addToInQueue(IPackage inPackage)
513 {
514 synchronized (mInQueue)
515 {
516 this.log(this, AbstractTunnel.DEBUG, "Added package to in queue: " + inPackage.toString());
517 mInQueue.add(inPackage);
518 mInQueue.notifyAll();
519 }
520 }
521
522 protected IPackage getNextFromInQueue()
523 {
524 synchronized (mInQueue)
525 {
526 while (mInQueue.isEmpty() && mIsInitialized)
527 try
528 {
529 mInQueue.wait();
530 }
531 catch (InterruptedException ie) {}
532
533 if (!mIsInitialized)
534 return null;
535
536 this.log(this, DEBUG, "Removed package from in queue: " + mInQueue.get(0).toString());
537 return (IPackage) mInQueue.remove(0);
538 }
539 }
540
541 protected Object waitForReply(IPackage requestPackage)
542 {
543 Integer packageID = new Integer(requestPackage.getID());
544
545 synchronized (mReplyMessages)
546 {
547 while (!mReplyMessages.containsKey(packageID))
548 try
549 {
550 mReplyMessages.wait();
551 }
552 catch (InterruptedException ie) {}
553
554 return mReplyMessages.remove(packageID);
555 }
556 }
557
558 protected void doOnTunnelClose()
559 {
560
561 if (mIsInitialized)
562 {
563 this.fireTunnelClosedEvent();
564 this.finalizeTunnel();
565 }
566 }
567
568 protected void doOnConnectionLost()
569 {
570
571 if (mIsInitialized)
572 {
573 this.fireConnectionLostEvent();
574 this.finalizeTunnel();
575 }
576 }
577
578 protected void finalizeTunnel()
579 {
580 try
581 {
582 if (mIsInitialized)
583 this.finalize();
584 }
585 catch (Exception e)
586 {
587 this.log(this, AbstractTunnel.ERROR, "Couldn't finalize tunnel", e);
588 }
589 }
590
591 private int getSessionID(Object message)
592 {
593 int sessionID = -1;
594 if (!(message instanceof IService) &&
595 !(message instanceof ServiceProvider.IServiceProviderMessage) &&
596 !(message instanceof ILoginService.LoginMessage) &&
597 !(message instanceof ILoginService.LoginServiceReturnMessage))
598 sessionID = UserInfo.getCurrentUserInfo().getSessionID();
599
600 return sessionID;
601 }
602
603
604
605 /*** An IPackage represents a package that is sent by an ITunnel. It encapsulates
606 * the actual message and provides extra information needed by the ITunnel.
607 */
608 public static interface IPackage extends Serializable
609 {
610
611 public static final int PING = 0;
612 public static final int ERROR = 1;
613 public static final int INFO = 2;
614 public static final int REQUEST = 3;
615 public static final int REPLY = 4;
616 public static final int DATA = 5;
617
618
619
620
621 /***
622 * Returns the unique package ID for this package. The ID must be unique
623 * within the ITunnels that are used to send this package. IDs must be
624 * positive integers.
625 */
626 public int getID();
627
628 /***
629 * If this IPackage is the reply to a request message, the ID of the request
630 * IPackage is returned. Otherwise -1 is returned.
631 */
632 public int getOriginID();
633
634 /***
635 * Sets the origin id of this IPackage.
636 */
637 public void setOriginID(int originID);
638
639 /***
640 * Returns the type of this IPackage.
641 */
642 public int getType();
643
644 /***
645 * Sets the type of this IPackage.
646 */
647 public void setType(int type);
648
649 /***
650 * Returns the encapsulated message.
651 */
652 public Object getMessage();
653
654 /*** Returns the session id that should be used by the server to find
655 * the session to use.
656 */
657 public int getSessionID();
658 }
659
660 public static class DefaultPackage implements IPackage
661 {
662
663 protected int mID;
664 protected int mOriginID;
665 protected int mType;
666 protected Object mMessage;
667 protected int mSessionID;
668
669
670 public DefaultPackage(int ID, Object message, int sessionID)
671 {
672 this(ID, -1, -1, message, sessionID);
673 }
674
675 public DefaultPackage(int ID, int originID, int type, Object message, int sessionID)
676 {
677 mID = ID;
678 mOriginID = originID;
679 mType = type;
680 mMessage = message;
681 mSessionID = sessionID;
682 }
683
684
685 public String toString()
686 {
687 StringBuffer stringBuffer = new StringBuffer();
688 stringBuffer.append('[');
689 stringBuffer.append(this.getID());
690 stringBuffer.append(' ');
691 stringBuffer.append(this.getOriginID());
692 stringBuffer.append(' ');
693 stringBuffer.append(this.getType());
694 stringBuffer.append("] ");
695 stringBuffer.append(this.getMessage());
696
697 return stringBuffer.toString();
698 }
699
700
701
702 /***
703 * Returns the unique package ID for this package. The ID must be unique
704 * within the ITunnels that are used to send this package. IDs must be
705 * positive integers.
706 */
707 public int getID()
708 {
709 return mID;
710 }
711
712 /***
713 * If this IPackage is the reply to a request message, the ID of the request
714 * IPackage is returned. Otherwise -1 is returned.
715 */
716 public int getOriginID()
717 {
718 return mOriginID;
719 }
720
721 /***
722 * Sets the origin id of this IPackage.
723 */
724 public void setOriginID(int originID)
725 {
726 mOriginID = originID;
727 }
728
729 /***
730 * Returns the type of this IPackage.
731 */
732 public int getType()
733 {
734 return mType;
735 }
736
737 /***
738 * Sets the type of this IPackage.
739 */
740 public void setType(int type)
741 {
742 mType = type;
743 }
744
745 /***
746 * Returns the encapsulated message.
747 */
748 public Object getMessage()
749 {
750 return mMessage;
751 }
752
753 /*** Returns the session id that should be used by the server to find
754 * the session to use.
755 */
756 public int getSessionID()
757 {
758 return mSessionID;
759 }
760 }
761
762 public static interface IFinalizeMessage extends Serializable
763 {
764 }
765
766 public static interface IConnectionMessage extends Serializable
767 {
768
769 public String getCodecClassName();
770
771 public String[] getPackerClassNames();
772 }
773
774 public static interface IStatusMessage extends Serializable
775 {
776
777 public static final int OK = 0;
778 public static final int NOT_OK = 1;
779
780
781 public int getStatus();
782
783 public String getMessage();
784 }
785
786 public static class DefaultStatusMessage implements IStatusMessage
787 {
788
789 protected int mStatus;
790 protected String mMessage;
791
792
793 public DefaultStatusMessage(int status, String message)
794 {
795 mStatus = status;
796 mMessage = message;
797 }
798
799
800 public String toString()
801 {
802 StringBuffer stringBuffer = new StringBuffer();
803 stringBuffer.append('[');
804 switch (mStatus)
805 {
806 case IStatusMessage.OK:
807 stringBuffer.append("OK");
808 break;
809 case IStatusMessage.NOT_OK:
810 stringBuffer.append("NOT_OK");
811 break;
812 }
813 stringBuffer.append("] ");
814 stringBuffer.append(mMessage);
815 return stringBuffer.toString();
816 }
817
818
819 public int getStatus()
820 {
821 return mStatus;
822 }
823
824 public String getMessage()
825 {
826 return mMessage;
827 }
828 }
829
830 public static final class DefaultConnectionMessage implements IConnectionMessage
831 {
832
833 protected String mCodecClassName;
834 protected String[] mPackerClassNames;
835
836
837 public DefaultConnectionMessage(String codecClassName)
838 {
839 this(codecClassName, new String[0]);
840 }
841
842 public DefaultConnectionMessage(String codecClassName, String[] packerClassNames)
843 {
844 mCodecClassName = codecClassName;
845 mPackerClassNames = packerClassNames;
846 }
847
848
849 public String toString()
850 {
851 StringBuffer buffer = new StringBuffer();
852 buffer.append("Codecs: ");
853 buffer.append(mCodecClassName);
854 buffer.append('\n');
855 buffer.append("Packers: ");
856 for (int i = 0; i < mPackerClassNames.length; i++)
857 {
858 if (i > 0)
859 buffer.append(", ");
860 buffer.append(mPackerClassNames[i]);
861 }
862
863 return buffer.toString();
864 }
865
866
867 public String getCodecClassName()
868 {
869 return mCodecClassName;
870 }
871
872 public String[] getPackerClassNames()
873 {
874 return mPackerClassNames;
875 }
876 }
877
878 public static final class ErrorMessage implements Serializable
879 {
880
881 protected String mErrorMessage;
882 protected Throwable mThrowable;
883
884
885 public ErrorMessage(String errorMessage, Throwable throwable)
886 {
887 mErrorMessage = errorMessage;
888 mThrowable = throwable;
889 }
890
891
892 public String toString()
893 {
894 return mErrorMessage;
895 }
896 }
897
898 protected class InputThread extends Thread
899 {
900
901 public static final int INITIALIZATION_RETRY_TIME = 1;
902
903
904 protected boolean mIsStopped;
905 protected int mNumerOfExceptions;
906 protected InputStream mInputStream;
907
908
909 public InputThread()
910 {
911 super("Server Input Thread");
912 }
913
914
915 public void run()
916 {
917 log(this, DEBUG, "InputThread started");
918
919 while (!mIsStopped)
920 {
921 try
922 {
923 if (this.initializeInputStream())
924 {
925 IPackage inPackage = (IPackage) mCodec.decode(mInputStream);
926
927 if (mIsStopped)
928 break;
929
930 if (inPackage != null)
931 {
932 log(this, DEBUG, "Input received on in stream");
933 addToInQueue(inPackage);
934 }
935 else
936 log(this, WARNING, "Codec returned null");
937 }
938 else
939 {
940 log(this, ERROR, "Coludn't initialize input stream, retrying in " + INITIALIZATION_RETRY_TIME + " seconds");
941 sleep(INITIALIZATION_RETRY_TIME * 1000);
942 }
943 }
944 catch (InterruptedException ie)
945 {
946 break;
947 }
948 catch (EOFException eofe)
949 {
950 if (!mIsStopped)
951 {
952 log(this, DEBUG, "Closing tunnel");
953 doOnTunnelClose();
954 }
955 }
956 catch (IOException ioe)
957 {
958 if (!mIsStopped)
959 {
960 log(this, ERROR, "An error occured", ioe);
961 doOnConnectionLost();
962 }
963 }
964 }
965
966 try
967 {
968 if (mInputStream != null)
969 mInputStream.close();
970 }
971 catch (IOException ioe)
972 {
973 Log.print(this, "Couldn't close input stream", ioe);
974 }
975 log(this, DEBUG, "InputThread stopped");
976 }
977
978
979 public void stopThread()
980 {
981 mIsStopped = true;
982 try
983 {
984 if (mInputStream != null)
985 {
986 mInputStream.close();
987 mInputStream = null;
988 }
989 }
990 catch (IOException ioe)
991 {
992
993
994 }
995 }
996
997
998 protected boolean initializeInputStream()
999 {
1000 if (mInputStream != null)
1001 return true;
1002
1003 InputStream inputStream = getInputStream();
1004 if (inputStream == null)
1005 return false;
1006 for (int i = 0; i < mPackers.size(); i++)
1007 inputStream = ((ITunnelPacker) mPackers.get(i)).createInputStream(inputStream);
1008
1009 mInputStream = inputStream;
1010 return true;
1011 }
1012 }
1013
1014 protected class OutputThread extends Thread
1015 {
1016
1017 public static final int INITIALIZATION_RETRY_TIME = 1;
1018
1019
1020 protected boolean mIsStopped;
1021 protected int mNumerOfExceptions;
1022 protected OutputStream mOutputStream;
1023
1024
1025 public OutputThread()
1026 {
1027 super("Server Output Thread");
1028 }
1029
1030
1031 public void run()
1032 {
1033 log(this, DEBUG, "OutputThread started");
1034
1035 while (!mIsStopped)
1036 {
1037 try
1038 {
1039 if (this.initializeOutputStream())
1040 {
1041 IPackage outPackage = getNextFromOutQueue();
1042 if (outPackage == null)
1043 break;
1044
1045 log(this, DEBUG, "New package taken from out queue: " + outPackage.toString());
1046 mCodec.encode(outPackage, mOutputStream);
1047 log(this, DEBUG, "Package encoded and sent");
1048 }
1049 else
1050 {
1051 log(this, ERROR, "Coludn't initialize output stream, retrying in " + INITIALIZATION_RETRY_TIME + " seconds");
1052 sleep(INITIALIZATION_RETRY_TIME * 1000);
1053 }
1054 }
1055 catch (InterruptedException ie)
1056 {
1057 break;
1058 }
1059 catch (EOFException eofe)
1060 {
1061 if (!mIsStopped)
1062 {
1063 log(this, DEBUG, "Closing tunnel", eofe);
1064 doOnTunnelClose();
1065 }
1066 }
1067 catch (IOException ioe)
1068 {
1069 log(this, ERROR, "An error occured", ioe);
1070 doOnConnectionLost();
1071 }
1072 }
1073
1074 try
1075 {
1076 if (mOutputStream != null)
1077 mOutputStream.close();
1078 }
1079 catch (IOException ioe)
1080 {
1081 Log.print(this, "Couldn't close output stream", ioe);
1082 }
1083 log(this, DEBUG, "OutputThread stopped");
1084 }
1085
1086
1087 public void stopThread()
1088 {
1089 mIsStopped = true;
1090 }
1091
1092
1093 protected boolean initializeOutputStream()
1094 {
1095 if (mOutputStream != null)
1096 return true;
1097
1098 OutputStream outputStream = getOutputStream();
1099 if (outputStream == null)
1100 return false;
1101 for (int i = 0; i < mPackers.size(); i++)
1102 outputStream = ((ITunnelPacker) mPackers.get(i)).createOutputStream(outputStream);
1103
1104 mOutputStream = outputStream;
1105 return true;
1106 }
1107 }
1108
1109 protected class WorkerThread extends Thread
1110 {
1111
1112 private boolean mIsStopped;
1113
1114
1115 public WorkerThread()
1116 {
1117 super("Server Worker Thread");
1118 }
1119
1120
1121 public void run()
1122 {
1123 log(this, DEBUG, "Worker thread started");
1124
1125 while (!mIsStopped)
1126 this.receiveInput(getNextFromInQueue());
1127
1128 log(this, DEBUG, "Worker thread stopped");
1129 }
1130
1131
1132 protected void stopWorkerThread()
1133 {
1134 mIsStopped = true;
1135 }
1136
1137
1138 protected void receiveInput(IPackage inPackage)
1139 {
1140 if (inPackage == null)
1141 return;
1142
1143 try
1144 {
1145
1146
1147
1148 if (!(inPackage.getMessage() instanceof IService) &&
1149 !(inPackage.getMessage() instanceof ServiceProvider.IServiceProviderMessage) &&
1150 !(inPackage.getMessage() instanceof ILoginService.LoginMessage) &&
1151 !(inPackage.getMessage() instanceof ILoginService.LoginServiceReturnMessage))
1152 {
1153 ISession session = SessionHandler.getInstance().getSession(inPackage.getSessionID());
1154 if (session != null)
1155 {
1156 UserInfo.setCurrentUserInfo(session.getUserInfo());
1157 }
1158 else
1159 {
1160 try
1161 {
1162 UserInfo.getCurrentUserInfo();
1163 }
1164 catch (Throwable t)
1165 {
1166 throw new SecurityException("No valid session found");
1167 }
1168 }
1169 }
1170
1171
1172 switch (inPackage.getType())
1173 {
1174 case IPackage.DATA:
1175 this.handleDataPackage(inPackage);
1176 break;
1177 case IPackage.REQUEST:
1178 this.handleRequestPackage(inPackage);
1179 break;
1180 case IPackage.REPLY:
1181 this.handleReplyPackage(inPackage);
1182 break;
1183 case IPackage.INFO:
1184 this.handleInfoPackage(inPackage);
1185 break;
1186 case IPackage.ERROR:
1187 this.handleErrorPackage(inPackage);
1188 break;
1189 case IPackage.PING:
1190 this.handlePingPackage(inPackage);
1191 break;
1192 default:
1193 log(this, DEBUG, "Received unknown package");
1194 }
1195 }
1196 catch (Throwable t)
1197 {
1198 IPackage errorPackage = createPackage(new ErrorMessage("Error while processing request", t), -1);
1199 errorPackage.setOriginID(inPackage.getID());
1200 errorPackage.setType(IPackage.ERROR);
1201 addToOutQueue(errorPackage);
1202 }
1203 finally
1204 {
1205 UserInfo.setCurrentUserInfo(null);
1206 }
1207 }
1208
1209 protected synchronized void handleDataPackage(IPackage inPackage)
1210 {
1211 log(this, DEBUG, "Received DATA package");
1212 this.consumePackage(inPackage);
1213 }
1214
1215 protected synchronized void handleRequestPackage(IPackage inPackage)
1216 {
1217 log(this, DEBUG, "Received REQUEST package");
1218
1219 for (int i = 0; i < mMessageConsumers.size(); i++)
1220 if (((IMessageConsumer) mMessageConsumers.get(i)).acceptsMessage(inPackage.getMessage()))
1221 {
1222 Object replyMessage = ((IMessageConsumer) mMessageConsumers.get(i)).answerMessage(inPackage.getMessage());
1223 IPackage replyPackage = createPackage(replyMessage, getSessionID(replyMessage));
1224 replyPackage.setOriginID(inPackage.getID());
1225 replyPackage.setType(IPackage.REPLY);
1226 addToOutQueue(replyPackage);
1227
1228
1229 return;
1230 }
1231
1232 log(this, WARNING, "Package not accepted: " + inPackage.toString());
1233 }
1234
1235 protected synchronized void handleReplyPackage(IPackage inPackage)
1236 {
1237 log(this, DEBUG, "Received REPLY package");
1238
1239 synchronized (mReplyMessages)
1240 {
1241 mReplyMessages.put(new Integer(inPackage.getOriginID()), inPackage.getMessage());
1242 mReplyMessages.notifyAll();
1243 }
1244 }
1245
1246 protected synchronized void handleInfoPackage(IPackage inPackage)
1247 {
1248 log(this, DEBUG, "Received INFO package");
1249
1250 for (int i = 0; i < mStatusCallbacks.size(); i++)
1251 ((IStatusCallback) mStatusCallbacks.get(i)).handleInfo(inPackage.getMessage().toString());
1252
1253 log(this, INFO, "INFO: " + inPackage.getMessage().toString());
1254 }
1255
1256 protected synchronized void handleErrorPackage(IPackage inPackage)
1257 {
1258 log(this, DEBUG, "Received ERROR package");
1259
1260 for (int i = 0; i < mStatusCallbacks.size(); i++)
1261 {
1262 if (inPackage.getMessage() instanceof ErrorMessage)
1263 {
1264 ErrorMessage errorMsg = (ErrorMessage) inPackage.getMessage();
1265 ((IStatusCallback) mStatusCallbacks.get(i)).handleError(errorMsg.mErrorMessage, errorMsg.mThrowable);
1266 }
1267 else
1268 ((IStatusCallback) mStatusCallbacks.get(i)).handleError(inPackage.getMessage().toString(), null);
1269 }
1270
1271 if (inPackage.getMessage() instanceof ErrorMessage)
1272 {
1273 ErrorMessage errorMsg = (ErrorMessage) inPackage.getMessage();
1274 log(this, ERROR, errorMsg.mErrorMessage, errorMsg.mThrowable);
1275 }
1276 else
1277 log(this, ERROR, inPackage.getMessage().toString());
1278
1279 if (inPackage.getOriginID() != -1)
1280 {
1281 synchronized (mReplyMessages)
1282 {
1283 mReplyMessages.put(new Integer(inPackage.getOriginID()), inPackage.getMessage());
1284 mReplyMessages.notifyAll();
1285 }
1286 }
1287 }
1288
1289 protected synchronized void handlePingPackage(IPackage inPackage)
1290 {
1291 log(this, DEBUG, "Received PING package");
1292 IPackage replyPackage = createPackage("", getSessionID(""));
1293 replyPackage.setOriginID(inPackage.getID());
1294 replyPackage.setType(IPackage.REPLY);
1295 addToOutQueue(replyPackage);
1296 }
1297
1298 protected synchronized void consumePackage(IPackage inPackage)
1299 {
1300 for (int i = 0; i < mMessageConsumers.size(); i++)
1301 if (((IMessageConsumer) mMessageConsumers.get(i)).acceptsMessage(inPackage.getMessage()))
1302 {
1303 ((IMessageConsumer) mMessageConsumers.get(i)).consumeMessage(inPackage.getMessage());
1304 return;
1305 }
1306
1307 log(this, WARNING, "Package not accepted: " + inPackage.toString());
1308 }
1309 }
1310 }