View Javadoc

1   /* (c) Copyright 2003 Caleigo AB, All rights reserved. 
2    * 
3    * This library is free software; you can redistribute it and/or
4    * modify it under the terms of the GNU Lesser General Public
5    * License as published by the Free Software Foundation; either
6    * version 2.1 of the License, or (at your option) any later version.
7    * 
8    * This library is distributed in the hope that it will be useful,
9    * but WITHOUT ANY WARRANTY; without even the implied warranty of
10   * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
11   * Lesser General Public License for more details.
12   * 
13   * You should have received a copy of the GNU Lesser General Public
14   * License along with this library; if not, write to the Free Software
15   * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
16   *  
17   */
18  
19  package org.caleigo.toolkit.tunnel;
20  
21  
22  import java.util.*;
23  import java.io.*;
24  
25  import org.caleigo.toolkit.log.*;
26  import org.caleigo.toolkit.util.CircularByteBuffer;
27  
28  /***
29   *
30   * @author  Mattias Hagstrand
31   * @version 1.00
32   * 
33   *//* 
34   *
35   * WHEN        WHO               WHY & WHAT
36   * -----------------------------------------------------------------------------
37   * 2002-07-24  Mattias Hagstrand Creation
38   */
39  public class HTTPTunnelServer extends AbstractTunnelServer
40  {
41      // Data members ------------------------------------------------------------
42      protected int mCurrentSessionID;
43      protected Hashtable mRegisteredTunnels;
44      
45      // Constructors ------------------------------------------------------------
46      public HTTPTunnelServer()
47      {
48          mRegisteredTunnels = new Hashtable();
49          this.addMessageConsumer(new CloseMessageConsumer());
50      }
51      
52      // Action methods ----------------------------------------------------------
53      
54      /***
55       * Sets up a new connection. This method will block until the connection is
56       * closed.
57       */
58      protected void newConnection(Object connectionMessage, OutputStream outputStream, int sessionID)
59      {
60          Log.print(this, "newConnection: " + sessionID);
61          
62          ObjectOutputStream objectOutputStream = null;
63          try
64          {
65              objectOutputStream = new ObjectOutputStream(outputStream);
66          }
67          catch (IOException ioe)
68          {
69              Log.printError(this, "Couldn't create output stream", ioe);
70              return;
71          }
72          
73          HTTPTunnel tunnel = null;
74          try
75          {
76              // Create and initialize a tunnel
77              HTTPInputStream httpInputStream = new HTTPInputStream(1024);
78              tunnel = new HTTPTunnel(httpInputStream, outputStream);
79              setUpTunnel(connectionMessage, tunnel);
80              Log.print(this, "Connection set up successfully!");
81              Object returnMessage = new AbstractTunnel.DefaultStatusMessage(AbstractTunnel.IStatusMessage.OK,
82                                                                             "Connection established successfully");
83              returnMessage = new HTTPTunnel.HTTPTunnelStatusMessage((AbstractTunnel.IStatusMessage) returnMessage, sessionID);
84  
85              // Send return message
86              Log.print(this, "Sending return message: " + returnMessage.toString());
87              objectOutputStream.writeObject(returnMessage);
88              objectOutputStream.flush();
89          }
90          catch (Exception e)
91          {
92              Log.printError(this, "Couldn't set up connection", e);
93              Object returnMessage = new AbstractTunnel.DefaultStatusMessage(AbstractTunnel.IStatusMessage.NOT_OK,
94                                                                             e.getMessage());
95              try
96              {
97                  objectOutputStream.writeObject(returnMessage);
98                  objectOutputStream.flush();
99              }
100             catch (IOException ioe)
101             {
102                 Log.printError(this, "Couldn't send return message", ioe);
103             }
104             return;
105         }
106         try
107         {
108             tunnel.initialize();
109 
110             // If the connection was set up successfully register the tunnel and
111             // block the thread
112             Integer sessionIDKey = new Integer(sessionID);
113             mRegisteredTunnels.put(sessionIDKey, tunnel);
114             this.waitForSessionClose(sessionIDKey);
115         }
116         catch (Exception e)
117         {
118             Log.printError(this, "Couldn't initialize tunnel", e);
119         }
120     }
121     
122     protected void closeConnection(int sessionID)
123     {
124         Log.print(this, "closeConnection: " + sessionID);
125         HTTPTunnel tunnel = (HTTPTunnel) mRegisteredTunnels.get(new Integer(sessionID));
126         if (tunnel != null)
127             //((HTTPInputStream) tunnel.getInputStream()).eof();
128             try
129             {
130                 if (tunnel.isActive())
131                     tunnel.finalize();
132             }
133             catch (Exception e)
134             {
135                 Log.printError(this, "Couldn't finalize tunnel", e);
136             }
137             finally
138             {
139                 this.doOnTunnelClosed(tunnel);
140             }
141     }
142     
143     // Superclass overrides ----------------------------------------------------
144     public void tunnelClosed(TunnelEvent event)
145     {
146         super.tunnelClosed(event);
147         this.doOnTunnelClosed(event.getSourceTunnel());
148     }
149     
150     public void connectionLost(TunnelEvent event)
151     {
152         super.connectionLost(event);
153         this.doOnTunnelClosed(event.getSourceTunnel());
154     }
155         
156     // Action methods ----------------------------------------------------------
157     protected synchronized int createSessionID()
158     {
159         if (mCurrentSessionID < Integer.MAX_VALUE)
160             mCurrentSessionID++;
161         else
162             mCurrentSessionID = 1;
163         
164         return mCurrentSessionID;
165     }
166     
167     protected synchronized void handleInputData(int sessionID, byte[] data)
168     {
169         HTTPInputStream httpInputStream = (HTTPInputStream) ((HTTPTunnel) mRegisteredTunnels.get(new Integer(sessionID))).getInputStream();
170         if (httpInputStream != null)
171             httpInputStream.addInputData(data);
172         else
173             Log.printWarning(this, "Couldn't handle input data for session id " + sessionID);
174     }
175     
176     // Help methods ------------------------------------------------------------
177         
178     protected synchronized void doOnTunnelClosed(ITunnel tunnel)
179     {
180         if (tunnel != null)
181             synchronized (tunnel)
182             {
183                 tunnel.notifyAll();
184             }
185     }
186     
187     protected void waitForSessionClose(Integer sessionID)
188     {
189         synchronized (mRegisteredTunnels.get(sessionID))
190         {
191             try
192             {
193                 // Wait until the session has been closed
194                 mRegisteredTunnels.get(sessionID).wait();
195                 
196                 // Unregister the tunnel
197                 mRegisteredTunnels.remove(sessionID);
198             }
199             catch (InterruptedException ie)
200             {
201                 Log.printError(this, "Interrupted", ie);
202             }
203         }
204         
205         Log.print(this, "Session closed");
206     }
207     
208     // Nested classes ----------------------------------------------------------
209     protected class CloseMessageConsumer implements IMessageConsumer
210     {
211         // IMessageConsumer implementation -------------------------------------
212         
213         /*** Returns <code>true</code> if this IMessageConsumer accepts the message.
214          * This method is allways called before <code>consumeMessage</code> for any
215          * given message.
216          *
217          */
218         public boolean acceptsMessage(Object message)
219         {
220             return (message instanceof HTTPTunnel.HTTPTunnelCloseMessage);
221         }
222         
223         /*** Tells the IMessageConsumer to consume a message and returns a new message
224          * that will be delivered to the sender of the originial message.
225          *
226          */
227         public Object answerMessage(Object message)
228         {
229             return null;
230         }
231         
232         /*** Tells the IMessageConsumer to consume a message.
233          *
234          */
235         public void consumeMessage(Object message)
236         {
237             Log.print(this, "Close message received");
238             closeConnection(((HTTPTunnel.HTTPTunnelCloseMessage) message).getSessionID());
239         }
240     }
241     
242     protected class HTTPInputStream extends InputStream
243     {
244         // Data members --------------------------------------------------------
245         protected byte[] mSingleByteBuffer = new byte[1];
246         protected CircularByteBuffer mBuffer;
247         protected boolean mEOF;
248         protected boolean mClosed;
249         
250         // Constructors --------------------------------------------------------
251         public HTTPInputStream(int bufferSize)
252         {
253             mBuffer = new CircularByteBuffer(bufferSize);
254         }
255         
256         // Superclass overrides ------------------------------------------------
257         public int available()
258             throws IOException
259         {
260             return mBuffer.getBufferSize();
261         }
262         
263         public void close()
264            throws IOException
265         {
266             mClosed = true;
267             synchronized (mBuffer)
268             {
269                 mBuffer.notifyAll();
270             }
271         }
272         
273         public int read()
274             throws IOException
275         {
276             this.read(mSingleByteBuffer, 0, 1);
277             return mSingleByteBuffer[0];
278         }
279         
280         public int read(byte[] b, int off, int len)
281             throws IOException
282         {
283             synchronized (mBuffer)
284             {
285                 while (mBuffer.getBufferSize() == 0)
286                 {
287                     try
288                     {
289                         if (mEOF)
290                             throw new EOFException();
291                         
292                         if (mClosed)
293                             return -1;
294                         
295                         mBuffer.wait();
296                     }
297                     catch (InterruptedException ie)
298                     {
299                         throw new IOException(ie.getMessage());
300                     }
301                 }
302             }
303             
304             return mBuffer.getFromBuffer(b, off, len);
305         }
306         
307         // Access methods ------------------------------------------------------
308         protected void addInputData(byte[] data)
309         {
310             synchronized (mBuffer)
311             {
312                 mBuffer.addToBuffer(data, 0, data.length);
313                 mBuffer.notifyAll();
314             }
315         }
316         
317         protected void eof()
318         {
319             mEOF = true;
320             synchronized (mBuffer)
321             {
322                 mBuffer.notifyAll();
323             }
324         }
325     }
326 }