1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.caleigo.core;
20
21 import java.util.*;
22
23 import org.caleigo.core.AbstractDataService.AbstractDataTransaction.DataOperation;
24 import org.caleigo.core.exception.*;
25 import org.caleigo.service.ServiceException;
26 import org.caleigo.toolkit.log.Log;
27 import org.caleigo.toolkit.tunnel.*;
28
29
30 /*** AbstractDataService provides an abstract implementation of the IDataService
31 * interface that uses an abstract IDataTransaction to implement its function.
32 * Descendants only need to to implement the newTransaction() and ping() methods
33 * plus the commit() method of the internal AbstractDataTransaction class.
34 *
35 * @author Dennis Zikovic
36 * @version 1.00
37 *
38 *//*
39 *
40 * WHEN WHO WHY & WHAT
41 * ------------------------------------------------------------------------------
42 * 2001-07-20 Dennis Zikovic Creation
43 */
44 public abstract class AbstractDataService implements IDataService
45 {
46
47 public final static int DEFAULT_TIMEOUT = 0;
48
49
50 private Object mServiceType;
51 private Object mServiceIdentity;
52 private IDataSourceDescriptor mDataSourceDescriptor;
53
54 private int mTimeout = DEFAULT_TIMEOUT;
55
56 private boolean mIsValidating = true;
57
58
59
60 /*** Default constructor for AbstractDataService.
61 */
62 public AbstractDataService(Object serviceType, Object serviceIdentity, IDataSourceDescriptor dataSourceDescriptor)
63 {
64 mServiceType = serviceType;
65 mServiceIdentity = serviceIdentity;
66 mDataSourceDescriptor = dataSourceDescriptor;
67
68
69
70 if(mDataSourceDescriptor.getDefaultDataSource()==null)
71 mDataSourceDescriptor.setDefaultDataSource(new SingleServiceDataSource(this));
72 }
73
74
75 public abstract IDataTransaction newTransaction();
76
77 /*** Should return true if the service is online and reponding to calls.
78 * Note that initializeService() must be called prior to this method.
79 */
80 public abstract boolean ping();
81
82
83 public IDataSourceDescriptor getDataSourceDescriptor()
84 {
85 return mDataSourceDescriptor;
86 }
87
88 public IEntity load(IEntityDescriptor entityDescriptor, Qualifier identitQualifier) throws DataServiceException
89 {
90 IDataTransaction transaction = this.newTransaction();
91 IEntity entity = entityDescriptor.createEntity();
92 transaction.addLoad(identitQualifier, entity);
93 transaction.commit();
94 return entity.isEmpty() ? null : entity;
95 }
96
97 public ISelection loadSelection(IEntityDescriptor entityDescriptor, Qualifier qualifier) throws DataServiceException
98 {
99 IDataTransaction transaction = this.newTransaction();
100 ISelection selection = new Selection(entityDescriptor);
101 transaction.addLoadSelection(entityDescriptor, qualifier, selection);
102 transaction.commit();
103 return selection;
104 }
105
106 public ISelection loadSelection(DataQuery dataQuery) throws DataServiceException
107 {
108 IDataTransaction transaction = this.newTransaction();
109 ISelection selection = new Selection(dataQuery.getEntityDescriptor());
110 transaction.addLoadSelection(dataQuery, selection);
111 transaction.commit();
112 return selection;
113 }
114
115 public void store(IEntity entity) throws DataServiceException
116 {
117 this.checkEntityAsStorable(entity);
118
119 IDataTransaction transaction = this.newTransaction();
120 transaction.addStore(entity);
121 transaction.commit();
122 }
123
124 public void delete(IEntity entity) throws DataServiceException
125 {
126 this.checkEntityAsDeletable(entity);
127
128 IDataTransaction transaction = this.newTransaction();
129 transaction.addDelete(entity);
130 transaction.commit();
131 }
132
133 public void refresh(IEntity entity) throws DataServiceException
134 {
135 IDataTransaction transaction = this.newTransaction();
136 transaction.addRefresh(entity);
137 transaction.commit();
138 }
139
140
141
142 /*** This method is always called by the ServiceManager before any other
143 * method in the service is called.
144 */
145 public void initializeService()
146 {
147 }
148
149 /*** This method is always called by the ServiceManager before the service
150 * is released. No other method calls will then be performed on this
151 * instance of the service.
152 */
153 public void finalizeService()
154 {
155 }
156
157 /*** Returns the service object that this service provides. This object
158 * should always implement the interface defined by the service interface
159 * class.
160 */
161 public Object getServiceInterface()
162 {
163 return this;
164 }
165
166 /*** Returns the class object the defines the service interface that all
167 * services with the same type provides.
168 */
169 public Class getServiceInterfaceClass()
170 {
171 return IDataService.class;
172 }
173
174 /*** Returns a URI that defines the type of this service. Note that
175 * there can be multiple services implementing the same service type but
176 * implementations of a given type must implement the same service
177 * interface.
178 */
179 public Object getServiceType()
180 {
181 return mServiceType;
182 }
183
184 /*** Returns the URI defining the a unique individual service.
185 */
186 public Object getServiceIdentity()
187 {
188 return mServiceIdentity;
189 }
190
191 /*** If this method returns true then the method getCustomProxyService must
192 * not return <code>null</code>.
193 */
194 public boolean hasCustomProxyService()
195 {
196 return true;
197 }
198
199 /*** Returns an IProxyService that is responsible for handling remote
200 * invokations of this service.
201 */
202 public org.caleigo.service.IProxyService getCustomProxyService(ITunnel tunnel)
203 {
204 TunneledDataService tunneledDataService = new TunneledDataService(
205 this.getServiceType(),
206 this.getServiceIdentity(),
207 this.getDataSourceDescriptor(),
208 tunnel);
209 tunnel.addMessageConsumer(new MessageConsumer(tunnel));
210 return tunneledDataService;
211 }
212
213
214
215
216
217
218
219 public boolean isValidating()
220 {
221 return mIsValidating;
222 }
223
224 public void setValidating(boolean validating)
225 {
226 mIsValidating = validating;
227 }
228
229 /*** Sets the timeout for asynchoniesly executed commits. It the timeout
230 * is set to 0, no timeout will be used.
231 *
232 * @param timeout the timeout in seconds.
233 */
234 public void setTimeout(int timeout)
235 {
236 mTimeout = timeout;
237 }
238
239 public int getTimeout()
240 {
241 return mTimeout;
242 }
243
244
245
246 /*** Checks if the entity is storable else an exception is thrown.
247 */
248 protected void checkEntityAsStorable(IEntity entity) throws DataServiceException
249 {
250 if(mIsValidating)
251 {
252 if(!entity.validateData().isValid())
253 throw new InvalidDataException("Store atempt on non-valid entity "+entity+"! ("+entity.validateData().getDisplayMessage()+")");
254
255 if(entity.isPersistent() && !entity.getEntityDescriptor().isEditable())
256 throw new DataServiceException("Update atempt on non-editable entity "+entity+"!");
257 else if(!entity.isPersistent() && !entity.getEntityDescriptor().isCreatable())
258 throw new DataServiceException("Create atempt on non-creatable entity "+entity+"!");
259 }
260
261 if(!DataAccessManager.getManager().hasWriteAccess(entity))
262 throw new SecurityException("No write access for "+entity+"!");
263 }
264
265 /*** Checks if the entity is deletable else an exception is thrown.
266 */
267 protected void checkEntityAsDeletable(IEntity entity) throws DataServiceException
268 {
269 if(mIsValidating)
270 {
271 if(!entity.getEntityDescriptor().isDeletable())
272 throw new DataServiceException("Delete atempt on non-deletable entity "+entity+"!");
273 }
274
275 if(!DataAccessManager.getManager().hasWriteAccess(entity))
276 throw new SecurityException("No write access for "+entity+"!");
277 }
278
279 /*** Help method that sets field data with better performance. The method
280 * does if possible avoid the setData method that has a considerable
281 * overhead in time and not least memory performance.
282 */
283 protected void setEntityData(IEntity entity, int j, Object data)
284 {
285 if(entity instanceof AbstractEntity)
286 ((AbstractEntity)entity).setRawData(j, data);
287 else
288 entity.setData(entity.getEntityDescriptor().getFieldDescriptor(j), data);
289 }
290
291
292 public abstract static class AbstractDataTransaction implements IDataTransaction, java.io.Serializable
293 {
294
295 private static int sIDCounter = 0;
296
297
298 private synchronized static int generateID()
299 {
300 sIDCounter++;
301 return sIDCounter;
302 }
303
304
305 private int mID;
306 private Vector mOperationQueue;
307 protected transient Job mJob;
308 private int mTimeout;
309
310
311 public AbstractDataTransaction(int timeout)
312 {
313 mTimeout = timeout;
314 mOperationQueue = new Vector(50);
315
316 mID = generateID();
317 }
318
319
320 public synchronized void addLoad(Qualifier identityQualifier, IEntity resultEntity)
321 {
322 this.addOperation(new DataOperation(DataOperation.LOAD, identityQualifier, resultEntity));
323 }
324
325 public synchronized void addLoadSelection(IEntityDescriptor entityDescriptor, Qualifier qualifier, ISelection resultSelection)
326 {
327 this.addLoadSelection(new DataQuery(entityDescriptor, qualifier), resultSelection);
328 }
329
330 public synchronized void addLoadSelection(IEntityDescriptor entityDescriptor, Qualifier qualifier, IEntityRelationPath relationPath, ISelection resultSelection)
331 {
332 DataQuery dataQuery = new DataQuery(entityDescriptor);
333 dataQuery.addExternalQualifier(qualifier, relationPath);
334 this.addLoadSelection(dataQuery, resultSelection);
335 }
336
337 public synchronized void addLoadSelection(DataQuery dataQuery, ISelection resultSelection)
338 {
339 this.addOperation(new DataOperation(dataQuery, resultSelection));
340 }
341
342 public synchronized void addStore(IEntity entity)
343 {
344 this.addOperation(new DataOperation(DataOperation.STORE, entity));
345 }
346
347 public synchronized void addDelete(IEntity entity)
348 {
349 this.addOperation(new DataOperation(DataOperation.DELETE, entity));
350 }
351
352 public synchronized void addRefresh(IEntity entity)
353 {
354 this.addOperation(new DataOperation(DataOperation.REFRESH, entity));
355 }
356
357 public synchronized void addCreate(IEntityDescriptor entityDescriptor)
358 {
359 this.addOperation(new DataOperation(DataOperation.CREATE_TABLE, entityDescriptor));
360 }
361
362 public synchronized void addRelation(IEntityRelation relation)
363 {
364 this.addOperation(new DataOperation(DataOperation.CREATE_RELATION, relation));
365 }
366
367 /*** Help methods that returns true if the transaction is empty, wich means
368 * that no operations has been added to it and that commit neads no be
369 * called.
370 */
371 public boolean isEmpty()
372 {
373 return mOperationQueue.isEmpty();
374 }
375
376 /*** Help methods that returns true if one or more store or delete methods
377 * has been added to the transaction.
378 */
379 public boolean isMutating()
380 {
381 boolean mutating = false;
382 for(int j=0; mutating && j<mOperationQueue.size(); j++)
383 mutating = ((DataOperation)mOperationQueue.get(j)).getOperationType()==DataOperation.STORE
384 || ((DataOperation)mOperationQueue.get(j)).getOperationType()==DataOperation.DELETE;
385 return mutating;
386 }
387
388 /*** This method creates a Job that when executed will perform the same actions
389 * as the <code>commit</code> method.
390 *
391 * @param lock when the commit finishes all threads that waits on this object
392 * will be notified. This parameter may be <code>null</code>.
393 */
394 public synchronized Job commitAsynchroniesly(Object lock) throws DataServiceException
395 {
396 mJob = new Job(this);
397
398 Thread asynchExecutor = new AsyncCommitExecutor(mJob, lock);
399 asynchExecutor.start();
400
401 if (mTimeout > 0)
402 {
403 TimeoutThread timeoutThread = new TimeoutThread(mJob);
404 timeoutThread.start();
405 }
406
407 return mJob;
408 }
409
410
411
412 /*** Commit performs all the stored operations in the transaction.
413 * If any of the operations fail a rollback on all operations will be
414 * automatically performed and a TransactionFailedException will be thrown.
415 */
416 public abstract void commit() throws DataServiceException;
417
418
419 public int getID()
420 {
421 return mID;
422 }
423
424
425 protected void addOperation(DataOperation op)
426 {
427 mOperationQueue.add(op);
428 }
429
430 protected void updateProgress(int maxProgress, int currentProgress)
431 {
432 if (mJob != null)
433 mJob.fireProgressChangedEvent(maxProgress, currentProgress);
434 }
435
436 public Enumeration getOperations()
437 {
438 return mOperationQueue.elements();
439 }
440
441 public int getNbrOfOperations()
442 {
443 return mOperationQueue.size();
444 }
445
446
447 protected Job getJob()
448 {
449 return mJob;
450 }
451
452
453 public static class DataOperation implements java.io.Serializable
454 {
455
456 public static final int LOAD = 1;
457 public static final int QUERY = 2;
458 public static final int STORE = 3;
459 public static final int DELETE = 4;
460 public static final int REFRESH = 5;
461 public static final int CREATE_TABLE = 6;
462 public static final int CREATE_RELATION = 7;
463
464
465 private int mOperationType;
466
467 private Qualifier mQualifier;
468 private IEntity mEntity;
469
470 private DataQuery mDataQuery;
471 private ISelection mEntitySelection;
472 private IEntityRelation mEntityRelation;
473 private IEntityDescriptor mEntityDescriptor;
474
475
476 public DataOperation(int operationType, IEntity entity)
477 {
478 mOperationType = operationType;
479 mEntity = entity;
480 }
481
482 public DataOperation(int operationType, Qualifier qualifier, IEntity entity)
483 {
484 mOperationType = operationType;
485 mQualifier = qualifier;
486 mEntity = entity;
487 }
488
489 public DataOperation(DataQuery dataQuery, ISelection entitySelection)
490 {
491 mOperationType = QUERY;
492 mDataQuery = dataQuery;
493 mEntitySelection = entitySelection;
494 }
495
496 public DataOperation(int operationType, IEntityDescriptor entityDescriptor)
497 {
498 mEntityDescriptor = entityDescriptor;
499 mOperationType = operationType;
500 }
501
502 public DataOperation(int operationType, IEntityRelation entityRelation)
503 {
504 mEntityRelation = entityRelation;
505 mOperationType = operationType;
506 }
507
508 public int getOperationType()
509 {
510 return mOperationType;
511 }
512
513 public Qualifier getQualifier()
514 {
515 if(mQualifier!=null)
516 return mQualifier;
517 else if(mEntity!=null)
518 return mEntity.getOriginQualifier();
519 else
520 return null;
521 }
522
523 public IEntity getEntity()
524 {
525 return mEntity;
526 }
527
528 public DataQuery getDataQuery()
529 {
530 return mDataQuery;
531 }
532
533 public ISelection getEntitySelection()
534 {
535 return mEntitySelection;
536 }
537
538 public IEntityDescriptor getEntityDescriptor()
539 {
540 return mEntityDescriptor;
541 }
542
543 public IEntityRelation getEntityRelation()
544 {
545 return mEntityRelation;
546 }
547 }
548
549 private class AsyncCommitExecutor extends Thread
550 {
551
552 private Job mJob;
553 private Object mLock;
554
555
556 public AsyncCommitExecutor(Job job, Object lock)
557 {
558 super("Async Commit Executor");
559
560 mJob = job;
561 mLock = lock;
562 }
563
564
565 public void run()
566 {
567 try
568 {
569 mJob.fireJobStartedEvent();
570 commit();
571 mJob.fireJobCompletedEvent();
572 } catch (Throwable t)
573 {
574 mJob.fireJobFailedEvent(t);
575 }
576 finally
577 {
578 if (mLock != null)
579 synchronized (mLock)
580 {
581 mLock.notifyAll();
582 }
583 }
584 }
585 }
586
587 private class TimeoutThread extends Thread
588 {
589
590 private Job mJob;
591
592
593 public TimeoutThread(Job job)
594 {
595 mJob = job;
596 }
597
598
599 public void run()
600 {
601 try
602 {
603 Thread.sleep(mTimeout * 1000);
604
605 if (!mJob.isFinished())
606 {
607 abortTransaction();
608 mJob.fireJobFailedEvent(new ServiceException("Execution timed out after " + mTimeout + " seconds."));
609 }
610 } catch (Throwable t)
611 {
612 Log.printError(this, "Error occured in timeout thread", t);
613 }
614 }
615 }
616 }
617
618 private class MessageConsumer implements IMessageConsumer
619 {
620
621 private ITunnel mTunnel;
622 private HashMap mLocalTransactions = new HashMap();
623
624
625 public MessageConsumer(ITunnel tunnel)
626 {
627 mTunnel = tunnel;
628 }
629
630
631
632 /***
633 * Returns <code>true</code> if this IMessageConsumer accepts the message.
634 * This method is allways called before <code>consumeMessage</code> for any
635 * given message.
636 */
637 public boolean acceptsMessage(Object message)
638 {
639 return ((message instanceof TunneledDataService.TunneledDataServiceCommitMessage ||
640 message instanceof TunneledDataService.TunneledDataServiceAbortTransactionMessage)&&
641 ((TunneledDataService.TunneledDataServiceMessage) message).mServiceInterfaceClass.equals(getServiceInterfaceClass()) &&
642 ((TunneledDataService.TunneledDataServiceMessage) message).mServiceIdentity.equals(mServiceIdentity) &&
643 ((TunneledDataService.TunneledDataServiceMessage) message).mServiceType.equals(mServiceType));
644 }
645
646 /***
647 * Tells the IMessageConsumer to consume a message.
648 */
649 public void consumeMessage(Object message)
650 {
651 }
652
653 /***
654 * Tells the IMessageConsumer to consume a message and returns a new message
655 * that will be delivered to the sender of the originial message.
656 */
657 public Object answerMessage(Object message)
658 {
659 if (message instanceof TunneledDataService.TunneledDataServiceCommitMessage)
660 {
661 Log.print(this, "Commit message received : " + message.getClass().getName());
662 AbstractDataTransaction messageTransaction = null;
663 try
664 {
665 final TunneledDataService.TunneledDataServiceCommitMessage commitMesage =
666 (TunneledDataService.TunneledDataServiceCommitMessage) message;
667
668 messageTransaction = commitMesage.mTransaction;
669 AbstractDataTransaction localTransaction = (AbstractDataTransaction) newTransaction();
670 mLocalTransactions.put(new Integer(messageTransaction.getID()), localTransaction);
671
672
673 Enumeration operations = messageTransaction.getOperations();
674 while (operations.hasMoreElements())
675 {
676 DataOperation oper = (DataOperation) operations.nextElement();
677 localTransaction.addOperation(oper);
678 Log.print(this, "Added op : Type = " + oper.mOperationType + ", query = " + oper.getDataQuery().getQualifier().toString());
679 }
680 localTransaction.commit();
681
682 Enumeration localOperations = localTransaction.getOperations();
683 Vector tempOperationHolder = new Vector();
684 while (localOperations.hasMoreElements())
685 tempOperationHolder.add(localOperations.nextElement());
686 return new TunneledDataService.TunneledDataServiceReturnMessage((AbstractDataTransaction.DataOperation[]) tempOperationHolder.toArray(new AbstractDataTransaction.DataOperation[0]));
687 }
688 catch (DataServiceException e)
689 {
690 return new TunneledDataService.TunneledDataServiceReturnMessage(e);
691 }
692 catch (Exception e)
693 {
694 return new TunneledDataService.TunneledDataServiceReturnMessage(new DataServiceException("Commit failed", e));
695 }
696 finally
697 {
698 if (messageTransaction != null)
699 mLocalTransactions.remove(new Integer(messageTransaction.getID()));
700 }
701 }
702 else if (message instanceof TunneledDataService.TunneledDataServiceAbortTransactionMessage)
703 {
704 Log.print(this, "Abort transaction message received");
705 TunneledDataService.TunneledDataServiceAbortTransactionMessage abortMessage =
706 (TunneledDataService.TunneledDataServiceAbortTransactionMessage) message;
707 if (mLocalTransactions.containsKey(new Integer(abortMessage.getTransactionID())))
708 ((IDataTransaction) mLocalTransactions.get(new Integer(abortMessage.getTransactionID()))).abortTransaction();
709 }
710
711 return null;
712 }
713 }
714 }