001/* 002 * Copyright (c) 2007-2010 The Regents of the University of California. 003 * All rights reserved. 004 * 005 * '$Author: crawl $' 006 * '$Date: 2015-12-17 17:59:53 +0000 (Thu, 17 Dec 2015) $' 007 * '$Revision: 34345 $' 008 * 009 * Permission is hereby granted, without written agreement and without 010 * license or royalty fees, to use, copy, modify, and distribute this 011 * software and its documentation for any purpose, provided that the above 012 * copyright notice and the following two paragraphs appear in all copies 013 * of this software. 014 * 015 * IN NO EVENT SHALL THE UNIVERSITY OF CALIFORNIA BE LIABLE TO ANY PARTY 016 * FOR DIRECT, INDIRECT, SPECIAL, INCIDENTAL, OR CONSEQUENTIAL DAMAGES 017 * ARISING OUT OF THE USE OF THIS SOFTWARE AND ITS DOCUMENTATION, EVEN IF 018 * THE UNIVERSITY OF CALIFORNIA HAS BEEN ADVISED OF THE POSSIBILITY OF 019 * SUCH DAMAGE. 020 * 021 * THE UNIVERSITY OF CALIFORNIA SPECIFICALLY DISCLAIMS ANY WARRANTIES, 022 * INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF 023 * MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE. THE SOFTWARE 024 * PROVIDED HEREUNDER IS ON AN "AS IS" BASIS, AND THE UNIVERSITY OF 025 * CALIFORNIA HAS NO OBLIGATION TO PROVIDE MAINTENANCE, SUPPORT, UPDATES, 026 * ENHANCEMENTS, OR MODIFICATIONS. 027 * 028 */ 029 030package org.kepler.provenance.sql; 031 032import java.sql.PreparedStatement; 033import java.sql.ResultSet; 034import java.sql.SQLException; 035import java.sql.Timestamp; 036import java.util.Collections; 037import java.util.Date; 038import java.util.LinkedHashMap; 039import java.util.Map; 040import java.util.WeakHashMap; 041 042import org.kepler.provenance.FireState; 043import org.kepler.provenance.Recording; 044import org.kepler.provenance.RecordingException; 045import org.kepler.provenance.RecordingParameters; 046import org.kepler.util.sql.DatabaseFactory; 047import org.kepler.util.sql.DatabaseType; 048import org.kepler.util.sql.Schema; 049 050import ptolemy.actor.Actor; 051import ptolemy.actor.Director; 052import ptolemy.actor.FiringEvent; 053import ptolemy.actor.IOPort; 054import ptolemy.actor.IOPortEvent; 055import ptolemy.actor.IORelation; 056import ptolemy.actor.LocalClock; 057import ptolemy.actor.TypedIOPort; 058import ptolemy.actor.gui.ColorAttribute; 059import ptolemy.actor.gui.SizeAttribute; 060import ptolemy.actor.gui.WindowPropertiesAttribute; 061import ptolemy.actor.gui.style.LineStyle; 062import ptolemy.actor.gui.style.TextStyle; 063import ptolemy.actor.parameters.ParameterPort; 064import ptolemy.actor.parameters.PortParameter; 065import ptolemy.actor.sched.Scheduler; 066import ptolemy.data.StringToken; 067import ptolemy.data.Token; 068import ptolemy.data.expr.Parameter; 069import ptolemy.domains.sdf.kernel.SDFDirector; 070import ptolemy.kernel.undo.UndoStackAttribute; 071import ptolemy.kernel.util.AbstractSettableAttribute; 072import ptolemy.kernel.util.Attribute; 073import ptolemy.kernel.util.ConfigurableAttribute; 074import ptolemy.kernel.util.IllegalActionException; 075import ptolemy.kernel.util.Location; 076import ptolemy.kernel.util.NameDuplicationException; 077import ptolemy.kernel.util.Nameable; 078import ptolemy.kernel.util.NamedObj; 079import ptolemy.vergil.basic.KeplerDocumentationAttribute; 080import ptolemy.vergil.icon.BoxedValueIcon; 081import ptolemy.vergil.icon.EditIconTableau; 082import ptolemy.vergil.icon.EditorIcon; 083import ptolemy.vergil.icon.TextIcon; 084import ptolemy.vergil.icon.UpdatedValueIcon; 085import ptolemy.vergil.icon.ValueIcon; 086import ptolemy.vergil.icon.XMLIcon; 087 088 089/** SQL Implementation of Recording using SDM SPA schema. 090 * 091 * @author Daniel Crawl 092 * @version $Id: SQLRecording.java 34345 2015-12-17 17:59:53Z crawl $ 093 * 094 */ 095 096public class SQLRecording extends Recording 097{ 098 /** Construct a new SQLRecording. */ 099 public SQLRecording() throws RecordingException 100 { 101 super(); 102 103 _dbType = null; 104 105 _schema = null; 106 107 // NOTE: we use a LinkedHashMap instead of a HashMap for predictable 108 // iteration order (the order in which keys are inserted into it). 109 // The map is iterated at the end of workflow execution to record 110 // the stop of rw-firings and we want a repeatable ordering for 111 // tests. 112 _fireStateTable = //Collections.synchronizedMap( 113 new LinkedHashMap<Actor, FireState<Integer>>();//); 114 115 _entityCacheTable = Collections.synchronizedMap( 116 new WeakHashMap<Nameable, RegEntity>()); 117 118 _dbReset(); 119 } 120 121 /** Stop recording. Called when the Recording type changes 122 * or ProvenanceListener removed from canvas. 123 * NOTE: this is NOT called when the GUI exits or when the 124 * workflow runs from the command line. 125 * 126 * This is the last method called to a Recording instance. 127 */ 128 @Override 129 public void disconnect() throws RecordingException 130 { 131 _dbReset(); 132 } 133 134 // Specification 135 136 /** Register an actor. */ 137 @Override 138 public boolean regActor(Actor actor) throws RecordingException 139 { 140 //_debug("regActor(" + _getNameableFullName(actor) + ")"); 141 142 int id = _regActorDirector((NamedObj)actor, true); 143 144 FireState<Integer> fs = new FireState<Integer>(actor, id); 145 _fireStateTable.put(actor, fs); 146 147 return true; 148 } 149 150 /** Register a director. */ 151 @Override 152 public boolean regDirector(Director director) throws RecordingException 153 { 154 _regActorDirector(director, false); 155 return true; 156 } 157 158 /** Register a parameter. A parameter can be any <b>entity</b> 159 * stored in the MoML that does not have its own 160 * <code>regNNN()</code> method. This can be user-level 161 * parameters (e.g., Parameter, StringParameter, etc.) or 162 * internal to Kepler (e.g., _location, semanticType000, etc.). 163 * (A "parameter" corresponds to a property in the MoML). 164 * 165 */ 166 @Override 167 public boolean regParameter(NamedObj parameter) throws RecordingException 168 { 169 boolean ignore = false; 170 171 String name = parameter.getName(); 172 173 if(parameter instanceof WindowPropertiesAttribute || 174 parameter instanceof ConfigurableAttribute || 175 parameter instanceof Location || 176 parameter instanceof KeplerDocumentationAttribute || 177 parameter instanceof SizeAttribute || 178 parameter instanceof ColorAttribute || 179 parameter instanceof TextIcon || 180 parameter instanceof EditIconTableau.Factory || 181 parameter instanceof TextStyle || 182 parameter instanceof ValueIcon || 183 parameter instanceof XMLIcon || 184 parameter instanceof LineStyle || 185 parameter instanceof UndoStackAttribute || 186 parameter instanceof BoxedValueIcon || 187 parameter instanceof UpdatedValueIcon || 188 parameter instanceof EditorIcon || 189 parameter instanceof org.kepler.moml.NamedObjIdReferralList || 190 parameter instanceof LocalClock || 191 parameter instanceof Scheduler || 192 ((parameter.getContainer() instanceof SDFDirector) && 193 (name.equals(SDFDirector.AUTO_NAME) || name.equals(SDFDirector.UNBOUNDED_NAME))) || 194 ((parameter.getContainer() instanceof Director) && 195 (name.equals("startTime") || name.equals("stopTime"))) || 196 name.equals("bold") || 197 name.equals("italic") || 198 name.equals("fontFamily") || 199 name.equals("textSize") || 200 name.equals("_notDraggable") || 201 name.indexOf("_vergil") == 0 || 202 name.indexOf("_hide") == 0 || 203 name.equals("_showName")) 204 { 205 ignore = true; 206 } 207 else 208 { 209 RegEntity re; 210 boolean isPort = false; 211 212 // if it's a PortParameter, we'll need to register both the 213 // port and parameter. 214 if(parameter instanceof PortParameter) 215 { 216 re = _checkEntity(parameter, RegEntity.EntityType.PortParameter); 217 isPort = true; 218 } 219 else 220 { 221 re = _checkEntity(parameter, RegEntity.EntityType.Parameter); 222 } 223 224 if(re.isNew()) 225 { 226 _regParameterReal(parameter, re); 227 228 if(isPort) 229 { 230 _regPortReal(((PortParameter)parameter).getPort(), re); 231 } 232 } 233 } 234 235 return (! ignore); 236 } 237 238 /** Register a link between two endpoints. */ 239 @Override 240 public boolean regLink(NamedObj endPoint1, NamedObj endPoint2) 241 throws RecordingException 242 { 243 244 /* 245 _debug("regLink(" + _getNameableFullName(endPoint1) + ", " + 246 _getNameableFullName(endPoint2) + ")"); 247 */ 248 249 // make sure both endpoints have been registered. 250 RegEntity.EntityType type = RegEntity.EntityType.getType(endPoint1); 251 RegEntity re1 = _checkEntity(endPoint1, type); 252 if(re1 == null) 253 { 254 throw new RecordingException("endPoint1 is not registered."); 255 } 256 257 type = RegEntity.EntityType.getType(endPoint2); 258 RegEntity re2 = _checkEntity(endPoint2, type); 259 if(re2 == null) 260 { 261 throw new RecordingException("endPoint2 is not registered."); 262 } 263 264 Link link = new Link(endPoint1, endPoint2); 265 266 //_debug("linkname = " + linkName); 267 268 RegEntity reLink = _checkEntity(link, RegEntity.EntityType.Link); 269 if(reLink.isNew()) 270 { 271 try 272 { 273 synchronized(_psLinkInsert) 274 { 275 _psLinkInsert.setInt(1, reLink.getId()); 276 _psLinkInsert.setInt(2, re1.getId()); 277 _psLinkInsert.setInt(3, re2.getId()); 278 _psLinkInsert.executeUpdate(); 279 } 280 281 if(_debugWriter != null) 282 { 283 _debugWrite("INSERT INTO LINK (" + reLink.getId() + 284 ", " + re1.getId() + ", " + re2.getId() + ")"); 285 } 286 } 287 catch(SQLException e) 288 { 289 throw new RecordingException(_getExceptionMessage(e)); 290 } 291 } 292 /* 293 else 294 { 295 _debug("link already recorded: " + linkName); 296 } 297 */ 298 299 return true; 300 301 } 302 303 /** Register a port or portparameter. */ 304 @Override 305 public boolean regPort(TypedIOPort port) throws RecordingException 306 { 307 //_debug("regPort(" + _getNameableFullName(port) + ")"); 308 // 309 310 RegEntity re; 311 boolean isParameter = false; 312 313 // if it's a ParameterPort, we'll need to register both the 314 // port and parameter. 315 if(port instanceof ParameterPort) 316 { 317 re = _checkEntity(port, RegEntity.EntityType.PortParameter); 318 isParameter = true; 319 } 320 else 321 { 322 re = _checkEntity(port, RegEntity.EntityType.Port); 323 } 324 325 if(re.isNew()) 326 { 327 _regPortReal(port, re); 328 329 if(isParameter) 330 { 331 _regParameterReal(((ParameterPort)port).getParameter(), re); 332 } 333 } 334 335 return true; 336 } 337 338 /** Register a relation. */ 339 @Override 340 public boolean regRelation(IORelation relation) throws RecordingException 341 { 342 //_debug("regRelation(" + _getNameableFullName(relation) + ")"); 343 344 RegEntity re = _checkEntity(relation, RegEntity.EntityType.Relation); 345 if(re.isNew()) 346 { 347 try 348 { 349 int width = relation.getWidth(); 350 synchronized(_psRelationInsert) 351 { 352 _psRelationInsert.setInt(1, re.getId()); 353 _psRelationInsert.setInt(2, width); 354 _psRelationInsert.executeUpdate(); 355 } 356 if(_debugWriter != null) 357 { 358 _debugWrite("INSERT INTO RELATION (id, " + 359 width + ")"); 360 } 361 } 362 catch(SQLException e) 363 { 364 throw new RecordingException(_getExceptionMessage(e)); 365 } 366 catch(IllegalActionException e) 367 { 368 throw new RecordingException(_getExceptionMessage(e)); 369 } 370 } 371 372 return true; 373 } 374 375 /** Called before registering workflow contents. */ 376 @Override 377 public void specificationStart() throws RecordingException 378 { 379 super.specificationStart(); 380 381 // clear the entity cache table so that entity renames are handled 382 // FIXME this will cause performance problems on large workflows 383 _entityCacheTable.clear(); 384 385 // always clear the fire state table since the id of actors 386 // can change between workflow executions due to the actor 387 // being renamed. 388 _fireStateTable.clear(); 389 } 390 391 392 //////////////////////////////////////////////////////////////////////// 393 //// Execution interface //// 394 395 /** Record the starting of workflow execution. */ 396 @Override 397 public void executionStart(Date timestamp) throws RecordingException 398 { 399 if(_wfExecId != RegEntity.UNKNOWN_ID) 400 { 401 throw new RecordingException("Workflow already running."); 402 } 403 404 // NOTE: do not set this to false so that workflow contents 405 // will be registered again. this handles cases where entities 406 // are renamed between runs. 407 //_needWorkflowContents(false); 408 409 if(_wfUserStr == null) 410 { 411 throw new RecordingException("Need workflow user name"); 412 } 413 414 // update the db 415 try 416 { 417 synchronized(_psWorkflowExecStart) 418 { 419 _psWorkflowExecStart.setInt(1, _wfId); 420 _psWorkflowExecStart.setString(2, _wfUserStr); 421 _psWorkflowExecStart.setTimestamp(3, new Timestamp(timestamp.getTime())); 422 _wfExecId = _dbType.insert(_psWorkflowExecStart, 423 "workflow_exec", "id"); 424 } 425 426 if(_debugWriter != null) 427 { 428 _debugWrite("INSERT INTO WORKFLOW_EXEC (" + _wfId + 429 ", " + _wfUserStr + ", curTime)"); 430 } 431 } 432 catch(SQLException e) 433 { 434 throw new RecordingException(_getExceptionMessage(e)); 435 } 436 } 437 438 /** Record the stopping of workflow execution. */ 439 @Override 440 public void executionStop(Date timestamp) throws RecordingException 441 { 442 if(_wfExecId == RegEntity.UNKNOWN_ID) 443 { 444 throw new RecordingException("Workflow not running."); 445 } 446 try 447 { 448 // stop any actors that may be rw firing 449 for(FireState<Integer> fs : _fireStateTable.values()) 450 { 451 synchronized(fs) 452 { 453 Integer id = fs.getCurFireId(true); 454 if(id != null) 455 { 456 _recordFiringEvent(fs.getActor(), 457 FiringEvent.AFTER_RW_FIRE, timestamp); 458 // reset the port state 459 // XXX what about port events between wf execs? 460 fs.setPortLastAccess( 461 FireState.PortAccessType.Uninitialized); 462 } 463 464 id = fs.getCurFireId(false); 465 if(id != null) 466 { 467 _recordFiringEvent(fs.getActor(), 468 fs.getNextStopFiringType(), timestamp); 469 } 470 } 471 } 472 473 // 474 synchronized(_psWorkflowExecStop) 475 { 476 _psWorkflowExecStop.setTimestamp(1, new Timestamp(timestamp.getTime())); 477 _psWorkflowExecStop.setInt(2, _wfExecId); 478 _psWorkflowExecStop.executeUpdate(); 479 } 480 481 if(_debugWriter != null) 482 { 483 _debugWrite("UPDATE WORKFLOW_EXEC SET end_time = " + 484 "curTime WHERE id = " + _wfExecId + ")"); 485 } 486 487 _wfExecId = RegEntity.UNKNOWN_ID; 488 } 489 catch(SQLException e) 490 { 491 throw new RecordingException(_getExceptionMessage(e)); 492 } 493 } 494 495 /** An execution was imported. */ 496 @Override 497 public void executionImported() 498 { 499 500 } 501 502 /** Record an actor fire event. */ 503 @Override 504 public void actorFire(FiringEvent event, Date timestamp) throws RecordingException 505 { 506 //_debug("actorFire: " + event); 507 _recordFiringEvent(event.getActor(), event.getType(), timestamp); 508 } 509 510 /** Record a port event. */ 511 @Override 512 public void portEvent(IOPortEvent event, Date timestamp) throws RecordingException 513 { 514 IOPort port = (TypedIOPort)event.getPort(); 515 516 int type = event.getEventType(); 517 boolean isRead = false; 518 boolean recordEvent = false; 519 520 switch(type) 521 { 522 case IOPortEvent.SEND_BEGIN: 523 isRead = false; 524 recordEvent = true; 525 break; 526 case IOPortEvent.GET_END: 527 isRead = true; 528 recordEvent = true; 529 break; 530 // ignore get_begin and send_end 531 case IOPortEvent.GET_BEGIN: 532 case IOPortEvent.SEND_END: 533 break; 534 // warn if received unknown type 535 default: 536 _warn("unhandled type of IOPortEvent: " + type); 537 break; 538 } 539 540 Actor actor = (Actor)port.getContainer(); 541 FireState<Integer> fs = _fireStateTable.get(actor); 542 543 // XXX check for null 544 synchronized(fs) 545 { 546 // see if we need to generate rw fire event(s) 547 FireState.PortAccessType lastAccess = fs.getPortLastAccess(); 548 if(lastAccess == FireState.PortAccessType.Uninitialized) 549 { 550 _recordFiringEvent(actor, FiringEvent.BEFORE_RW_FIRE, timestamp); 551 } 552 else if(lastAccess == FireState.PortAccessType.Write && isRead) 553 { 554 _recordFiringEvent(actor, FiringEvent.AFTER_RW_FIRE, timestamp); 555 _recordFiringEvent(actor, FiringEvent.BEFORE_RW_FIRE, timestamp); 556 } 557 558 // record the type of last port access for this actor 559 if(isRead) 560 { 561 fs.setPortLastAccess(FireState.PortAccessType.Read); 562 } 563 else 564 { 565 fs.setPortLastAccess(FireState.PortAccessType.Write); 566 } 567 568 if(recordEvent) 569 { 570 /* 571 _Debug("port " + _getNameableFullName(port) + 572 (isRead ? " read " : " write ") + 573 (port.isOpaque() ? "opaque " : "transparent ")); 574 */ 575 //_Debug("type " + port.getType()); 576 577 Integer fireId = fs.getCurFireId(false); 578 if(fireId == null) 579 { 580 fireId = RegEntity.UNKNOWN_ID; 581 } 582 583 // getCurFireId may return null 584 int rwfireId = RegEntity.UNKNOWN_ID; 585 Integer rwfireIdInteger = fs.getCurFireId(true); 586 if(rwfireIdInteger != null) 587 { 588 rwfireId = rwfireIdInteger; 589 } 590 591 // an actor may read or write a token in a non-firing 592 // method. e.g., SampleDelay writes a token in initilize(). 593 if(fireId == RegEntity.UNKNOWN_ID) 594 { 595 /* 596 _warn("port event not in fire method for " + 597 _getNameableFullName(actor)); 598 */ 599 600 fireId = rwfireId; 601 } 602 603 if(event.getVectorLength() == IOPortEvent.SINGLETOKEN) 604 { 605 _recordPortEvent(port, fireId, rwfireId, isRead, 606 event.getChannel(), event.getToken(), 607 event.getReceiverPort(), timestamp); 608 } 609 else 610 { 611 Token[] tokenArray = event.getTokenArray(); 612 for(int i = 0; i < tokenArray.length; i++) 613 { 614 _recordPortEvent(port, fireId, rwfireId, isRead, 615 event.getChannel(), tokenArray[i], 616 event.getReceiverPort(), timestamp); 617 } 618 } 619 } 620 } 621 } 622 623 /** Add Parameters for ProvenanceListener. */ 624 @Override 625 public RecordingParameters generateParameters(NamedObj no) 626 throws IllegalActionException, NameDuplicationException 627 { 628 _params = new SQLRecordingParameters(no); 629 return _params; 630 } 631 632 /** React to a change in an attribute. */ 633 @Override 634 public void attributeChanged(Attribute attribute) 635 throws IllegalActionException 636 { 637 String name = attribute.getName(); 638 String val; 639 640 //_debug("begin sql attr changed: " + attribute + " hash = " + _recorder.getContainer().hashCode() ); 641 642 DatabaseFactory.Parameter dbParameter = 643 DatabaseFactory.Parameter.getType(name); 644 645 if(dbParameter != null) 646 { 647 switch(dbParameter) 648 { 649 case USER: 650 val = 651 ((StringToken)((Parameter)attribute).getToken()).stringValue(); 652 653 String dbUserStr = _dbParams.get(DatabaseFactory.Parameter.USER.getName()); 654 if(dbUserStr == null || !val.equals(dbUserStr)) 655 { 656 _dbParams.put(DatabaseFactory.Parameter.USER.getName(), val); 657 _needReconnectDB = true; 658 } 659 660 break; 661 662 case PASSWD: 663 val = 664 ((StringToken)((Parameter)attribute).getToken()).stringValue(); 665 666 String dbPasswdStr = _dbParams.get(DatabaseFactory.Parameter.PASSWD.getName()); 667 if(dbPasswdStr == null || !val.equals(dbPasswdStr)) 668 { 669 _dbParams.put(DatabaseFactory.Parameter.PASSWD.getName(), val); 670 _needReconnectDB = true; 671 } 672 673 break; 674 675 case HOST: 676 val = 677 ((StringToken)((Parameter)attribute).getToken()).stringValue(); 678 679 String dbHostStr = _dbParams.get(DatabaseFactory.Parameter.HOST.getName()); 680 if(dbHostStr == null || !val.equals(dbHostStr)) 681 { 682 _dbParams.put(DatabaseFactory.Parameter.HOST.getName(), val); 683 _needReconnectDB = true; 684 } 685 686 break; 687 688 case NAME: 689 val = 690 ((StringToken)((Parameter)attribute).getToken()).stringValue(); 691 692 String dbNameStr = _dbParams.get(DatabaseFactory.Parameter.NAME.getName()); 693 if(dbNameStr == null || !val.equals(dbNameStr)) 694 { 695 _dbParams.put(DatabaseFactory.Parameter.NAME.getName(), val); 696 _needReconnectDB = true; 697 } 698 699 break; 700 701 case TYPE: 702 val = 703 ((StringToken)((Parameter)attribute).getToken()).stringValue(); 704 705 if(_dbType == null || !val.equals(_dbType.getName())) 706 { 707 if(_dbType != null) 708 { 709 try 710 { 711 _dbType.disconnect(); 712 } 713 catch(SQLException e) 714 { 715 throw new IllegalActionException("Error " + 716 "disconnecting from database: " + e.getMessage()); 717 } 718 } 719 720 _dbParams.put(DatabaseFactory.Parameter.TYPE.getName(), val); 721 _dbType = DatabaseFactory.getType(val); 722 if(_dbType == null) 723 { 724 throw new IllegalActionException( 725 "Invalid database type: " + val); 726 } 727 _needReconnectDB = true; 728 } 729 730 break; 731 732 case PORT: 733 val = 734 ((StringToken)((Parameter)attribute).getToken()).stringValue(); 735 736 String dbPortStr = _dbParams.get(DatabaseFactory.Parameter.PORT.getName()); 737 if(dbPortStr == null || !val.equals(dbPortStr)) 738 { 739 _dbParams.put(DatabaseFactory.Parameter.PORT.getName(), val); 740 _needReconnectDB = true; 741 } 742 743 break; 744 745 case TABLEPREFIX: 746 val = 747 ((StringToken)((Parameter)attribute).getToken()).stringValue(); 748 749 String dbTablePrefixStr = _dbParams.get(DatabaseFactory.Parameter.TABLEPREFIX.getName()); 750 if(dbTablePrefixStr == null || !val.equals(dbTablePrefixStr)) 751 { 752 _dbParams.put(DatabaseFactory.Parameter.TABLEPREFIX.getName(), val); 753 _needReconnectDB = true; 754 } 755 756 break; 757 758 case JDBC_URL: 759 val = 760 ((StringToken)((Parameter)attribute).getToken()).stringValue(); 761 762 String dbJDBCURLStr = _dbParams.get(DatabaseFactory.Parameter.JDBC_URL.getName()); 763 if(dbJDBCURLStr == null || !val.equals(dbJDBCURLStr)) 764 { 765 _dbParams.put(DatabaseFactory.Parameter.JDBC_URL.getName(), val); 766 _needReconnectDB = true; 767 } 768 769 break; 770 771 case CREATE_INDEXES: 772 val = 773 ((StringToken)((Parameter)attribute).getToken()).stringValue(); 774 775 String createIndexesStr = _dbParams.get(DatabaseFactory.Parameter.CREATE_INDEXES.getName()); 776 if(createIndexesStr == null || !val.equals(createIndexesStr)) 777 { 778 _dbParams.put(DatabaseFactory.Parameter.CREATE_INDEXES.getName(), val); 779 _needReconnectDB = true; 780 } 781 782 break; 783 784 default: 785 System.out.println("WARNING: unknown " + 786 "DatabaseFactory.Parameter type: " + 787 dbParameter.getName()); 788 break; 789 } 790 } 791 else if(name.equals(SQLRecordingParameters.wfNameParamStr)) 792 { 793 val = 794 ((StringToken)((Parameter)attribute).getToken()).stringValue(); 795 796 if(_wfNameStr == null || !val.equals(_wfNameStr)) 797 { 798 _wfNameStr = val; 799 _needReconnectWF = true; 800 //_debug("wf name changed to " + _wfNameStr); 801 } 802 } 803 else if(name.equals("User Name")) 804 { 805 val = 806 ((StringToken)((Parameter)attribute).getToken()).stringValue(); 807 808 // if empty, set default from system property 809 if(val.length() == 0) 810 { 811 val = System.getProperty("user.name"); 812 } 813 814 if(_wfUserStr == null || !val.equals(_wfUserStr)) 815 { 816 _wfUserStr = val; 817 } 818 819 // for backwards compatibility 820 String dbUserStr = _dbParams.get(DatabaseFactory.Parameter.USER.getName()); 821 if(dbUserStr == null || dbUserStr.length() == 0) 822 { 823 _dbParams.put(DatabaseFactory.Parameter.USER.getName(), val); 824 _needReconnectDB = true; 825 } 826 } 827 else 828 { 829 super.attributeChanged(attribute); 830 } 831 832 // get workflow contents if we need to reconnect to db 833 // or workflow 834 if(_needReconnectDB || _needReconnectWF) 835 { 836 _needWorkflowContents(true); 837 838 // XXX 839 if(_needReconnectWF) 840 { 841 _wfReset(); 842 } 843 844 // XXX need dbReset? 845 } 846 847 //_debug("end sql attr changed: " + attribute); 848 } 849 850 //////////////////////////////////////////////////////////////////////// 851 //// public variables //// 852 853 /** The token data is valid. */ 854 public static final int DATA_VALID = 0; 855 856 /** The token data was not recorded. */ 857 public static final int DATA_NONE = 1; 858 859 /** The token data was too large to completely store. */ 860 public static final int DATA_TRUNCATED = 2; 861 862 // port io types 863 public enum PortDirection 864 { 865 In, 866 Out, 867 InOut; 868 869 static public PortDirection valueOf(int id) 870 { 871 if(id < 0 || id >= values().length) 872 { 873 return null; 874 } 875 else 876 { 877 return values()[id]; 878 } 879 } 880 }; 881 882 //////////////////////////////////////////////////////////////////////// 883 //// protected methods //// 884 885 /** Check parameters before (re)connecting to database or workflow. */ 886 protected void _checkParameters() throws IllegalActionException, 887 RecordingException 888 { 889 if(_needReconnectWF) 890 { 891 //_debug("checkParameters; need reconnectWF; name len = " + 892 // _wfNameStr.length() + " name = " + _wfNameStr); 893 894 _checkWorkflowName(); 895 } 896 897 if(_needReconnectDB) 898 { 899 if(_dbType == null) 900 { 901 throw new RecordingException("No database connection."); 902 } 903 904 // see if the jdbc url override is empty. 905 // if so, need to check individual parameters. 906 String dbJDBCURLStr = _dbParams.get(DatabaseFactory.Parameter.JDBC_URL.getName()); 907 if(dbJDBCURLStr == null || dbJDBCURLStr.length() == 0) 908 { 909 String dbUserStr = _dbParams.get(DatabaseFactory.Parameter.USER.getName()); 910 if(_dbType.needUserForConnect() && 911 (dbUserStr == null || dbUserStr.length() == 0)) 912 { 913 throw new RecordingException("Need database user name."); 914 } 915 916 String dbPasswdStr = _dbParams.get(DatabaseFactory.Parameter.PASSWD.getName()); 917 if(_dbType.needPasswordForConnect() && 918 (dbPasswdStr == null || dbPasswdStr.length() == 0)) 919 { 920 throw new RecordingException("Need database password"); 921 } 922 923 String dbHost = _dbParams.get(DatabaseFactory.Parameter.HOST.getName()); 924 if(_dbType.needHostForConnect() && 925 (dbHost == null || dbHost.length() == 0)) 926 { 927 throw new RecordingException("Need database host name."); 928 } 929 930 String dbNameStr = _dbParams.get(DatabaseFactory.Parameter.NAME.getName()); 931 if(dbNameStr == null || dbNameStr.length() == 0) 932 { 933 throw new RecordingException("Need database name."); 934 } 935 } 936 } 937 938 /* 939 _debug("going to _connectProvStore: "); 940 _debug("wf: " + _wfNameStr); 941 _debug("user: " + _dbUserStr); 942 _debug("dbhost: " + _dbHostStr); 943 _debug("dbhost: " + _dbPortStr); 944 _debug("passwd: " + _dbPasswdStr); 945 */ 946 } 947 948 /** Check validity of workflow name parameter. */ 949 protected void _checkWorkflowName() throws RecordingException 950 { 951 if(_wfNameStr == null || _wfNameStr.length() == 0) 952 { 953 throw new RecordingException("Need workflow name."); 954 } 955 } 956 957 /** Create a Schema to reflect the v6 schema. */ 958 protected Schema _createSchema() 959 { 960 return Schemas.createSchemaV6(); 961 } 962 963 /** Reconnect to the database. */ 964 protected void _reconnectDatabase(boolean resetDB) throws RecordingException 965 { 966 if(resetDB) 967 { 968 _dbReset(); 969 } 970 971 try 972 { 973 _dbType = DatabaseFactory.getConnectedDatabaseType(_dbParams, "provenance"); 974 Schemas.checkVersion(_dbType, _schema); 975 976 _maxDataSize = _getMaxTokenDataSize(); 977 _maxParameterValueSize = _getMaxParameterValueSize(); 978 979 //System.out.println("max data size = " + _maxDataSize); 980 //System.out.println("max param value size = " + __maxParameterValueSize); 981 } 982 catch(SQLException e) 983 { 984 _errorReset(); 985 throw new RecordingException("Error connecting or " + 986 "initializing database.\n" + _dbParams + "\n", e); 987 } 988 989 try 990 { 991 _nullPreparedStatements(); 992 _createPreparedStatements(); 993 } 994 catch(SQLException e) 995 { 996 throw new RecordingException("Error creating prepared " + 997 "statements: " + e.getMessage()); 998 } 999 1000 _needReconnectDB = false; 1001 } 1002 1003 /** Get the maximum size of a parameter value that can be stored in the 1004 * database. 1005 */ 1006 protected int _getMaxParameterValueSize() throws SQLException 1007 { 1008 return _dbType.getColumnSize("parameter", "value"); 1009 } 1010 1011 /** Get the maximum size of token data that can be stored in the 1012 * database. 1013 */ 1014 protected int _getMaxTokenDataSize() throws SQLException 1015 { 1016 return _dbType.getColumnSize("token_flow", "data"); 1017 } 1018 1019 /** Re-acquire the current workflow id. */ 1020 protected void _reconnectWorkflow() throws RecordingException 1021 { 1022 _wfReset(); 1023 1024 _needReconnectWF = false; 1025 1026 int rc = _updateWorkflowID(false); 1027 1028 if(rc == WORKFLOW_ALREADY_EXISTS) 1029 { 1030 1031 } 1032 else if(rc == WORKFLOW_DOES_NOT_EXIST) 1033 { 1034 //_debug("workflow does not exist; attempting to create " + 1035 //"new one."); 1036 1037 //XXX need to check for return value 1038 _updateWorkflowID(true); 1039 } 1040 } 1041 1042 /** Initialize the prepared statements. */ 1043 protected void _createPreparedStatements() throws SQLException 1044 { 1045 // NOTE: for each PreparedStatement we check to see if 1046 // it has already been created by a subclass: it may 1047 // have a different columns for a table, or removed 1048 // the entire table. 1049 1050 if(_psWorkflowInsert == null && _schema.containsTable("workflow")) 1051 { 1052 _psWorkflowInsert = _dbType.getSQLInsert("workflow", "id", 1053 "name, version, action_id", "?, ?, ?"); 1054 } 1055 1056 if(_psWorkflowQuery == null && _schema.containsTable("workflow")) 1057 { 1058 _psWorkflowQuery = _dbType.getSQLSelect("workflow", "id", 1059 "name = ?"); 1060 } 1061 1062 if(_psActionInsert == null && _schema.containsTable("action")) 1063 { 1064 _psActionInsert = _dbType.getSQLInsert("action", "id", 1065 "parent_id, user, time", "?, ?, ?"); 1066 } 1067 1068 if(_psEntityInsert == null && _schema.containsTable("entity")) 1069 { 1070 _psEntityInsert = _dbType.getSQLInsert("entity", "id", 1071 "container_id, workflow_id, type, name", "?, ?, ?, ?"); 1072 } 1073 1074 if(_psEntityQuery == null && _schema.containsTable("entity")) 1075 { 1076 _psEntityQuery = _dbType.getSQLSelect("entity", "id", 1077 "workflow_id = ? AND name = ? AND type = ?"); 1078 } 1079 1080 if(_psActorInsert == null && _schema.containsTable("actor")) 1081 { 1082 _psActorInsert = _dbType.getSQLInsert("actor", "id, class", "?, ?"); 1083 } 1084 1085 if(_psDirectorInsert == null && _schema.containsTable("director")) 1086 { 1087 _psDirectorInsert = _dbType.getSQLInsert("director", "id, class", 1088 "?, ?"); 1089 } 1090 1091 if(_psParameterInsert == null && _schema.containsTable("parameter")) 1092 { 1093 _psParameterInsert = _dbType.getSQLInsert("parameter", 1094 "id, type, value", "?, ?, ?"); 1095 } 1096 1097 if(_psLinkInsert == null && _schema.containsTable("link")) 1098 { 1099 _psLinkInsert = _dbType.getSQLInsert("link", 1100 "id, end_point_1, end_point_2", "?, ?, ?"); 1101 } 1102 1103 if(_psPortInsert == null && _schema.containsTable("port")) 1104 { 1105 _psPortInsert = _dbType.getSQLInsert("port", 1106 "id, type, direction, multiport", "?, ?, ?, ?"); 1107 } 1108 1109 if(_psRelationInsert == null && _schema.containsTable("relation")) 1110 { 1111 _psRelationInsert = _dbType.getSQLInsert("relation", "id, width", 1112 "?, ?"); 1113 } 1114 1115 if(_psWorkflowExecStart == null && 1116 _schema.containsTable("workflow_exec")) 1117 { 1118 String defaultTimeStr = _dbType.getDefaultTimeStr(); 1119 _psWorkflowExecStart = _dbType.getSQLInsert("workflow_exec", "id", 1120 "workflow_id, user, start_time, end_time", "?, ?, ?, " + 1121 defaultTimeStr); 1122 } 1123 1124 if(_psWorkflowExecStop == null && 1125 _schema.containsTable("workflow_exec")) 1126 { 1127 _psWorkflowExecStop = _dbType.getSQLUpdate("workflow_exec", 1128 "end_time = ?", "id = ?"); 1129 } 1130 1131 if(_psActorFireStart == null && _schema.containsTable("actor_fire")) 1132 { 1133 String defaultTimeStr = _dbType.getDefaultTimeStr(); 1134 _psActorFireStart = _dbType.getSQLInsert("actor_fire", "id", 1135 "actor_id, wf_exec_id, start_time, type, end_time", 1136 "?, ?, ?, ?, " + defaultTimeStr); 1137 } 1138 1139 if(_psActorFireStop == null && _schema.containsTable("actor_fire")) 1140 { 1141 _psActorFireStop = _dbType.getSQLUpdate("actor_fire", 1142 "end_time = ?", "id = ?"); 1143 } 1144 1145 if(_psTokenFlowInsert == null && _schema.containsTable("token_flow")) 1146 { 1147 _psTokenFlowInsert = _dbType.getSQLInsert("token_flow", "id", 1148 "port_id, time, data, channel, is_read, fire_id, rw_fire_id, " + 1149 "data_description", "?, ?, ?, ?, ?, ?, ?, ?"); 1150 } 1151 } 1152 1153 /** Set our prepared statements to null. */ 1154 protected void _nullPreparedStatements() 1155 { 1156 _psWorkflowInsert = null; 1157 _psWorkflowQuery = null; 1158 _psActionInsert = null; 1159 _psEntityInsert = null; 1160 _psEntityQuery = null; 1161 _psActorInsert = null; 1162 _psDirectorInsert = null; 1163 _psParameterInsert = null; 1164 _psLinkInsert = null; 1165 _psPortInsert = null; 1166 _psRelationInsert = null; 1167 _psWorkflowExecStart = null; 1168 _psWorkflowExecStop = null; 1169 _psActorFireStart = null; 1170 _psActorFireStop = null; 1171 _psTokenFlowInsert = null; 1172 } 1173 1174 /** Update workflow ID either by retrieving the id from an existing 1175 * workflow in the database or creating a new one. 1176 * 1177 * @param isNew 1178 * @return 1179 */ 1180 protected int _updateWorkflowID(boolean isNew) throws RecordingException 1181 { 1182 int retval = SUCCESS; 1183 1184 // check to see if it already exists. 1185 int id = _getWorkflowId(); 1186 1187 // see if this is a new workflow 1188 if(isNew) 1189 { 1190 // see if it already exists 1191 if(id != RegEntity.UNKNOWN_ID) 1192 { 1193 retval = WORKFLOW_ALREADY_EXISTS; 1194 } 1195 else 1196 { 1197 1198 if(_wfUserStr == null) 1199 { 1200 throw new RecordingException("Need workflow user " + 1201 "name"); 1202 } 1203 1204 _addWorkflow(); 1205 } 1206 } 1207 // see if workflow was not in db 1208 else if(id == RegEntity.UNKNOWN_ID) 1209 { 1210 retval = WORKFLOW_DOES_NOT_EXIST; 1211 } 1212 else 1213 { 1214 int oldWfId = _wfId; 1215 1216 // workflow exists in db; get id from query. 1217 _wfId = id; 1218 1219 // see if id is different 1220 if(oldWfId != _wfId) 1221 { 1222 _wfReset(); 1223 } 1224 } 1225 1226 return retval; 1227 } 1228 1229 /** Get the internal workflow id. */ 1230 protected int _getWorkflowId() throws RecordingException 1231 { 1232 // use the workflow name as the id string. 1233 return _getWorkflowId(_wfNameStr); 1234 } 1235 1236 /** Get the internal workflow id given a id string. If no workflow has 1237 * the id string, returns RegEntity.UNKNOWN_ID. 1238 */ 1239 protected int _getWorkflowId(String idStr) throws RecordingException 1240 { 1241 int retval = RegEntity.UNKNOWN_ID; 1242 1243 try 1244 { 1245 synchronized(_psWorkflowQuery) 1246 { 1247 _psWorkflowQuery.setString(1, idStr); 1248 ResultSet rs = _psWorkflowQuery.executeQuery(); 1249 1250 if(rs.next()) 1251 { 1252 retval = rs.getInt("id"); 1253 } 1254 rs.close(); 1255 } 1256 } 1257 catch(SQLException e) 1258 { 1259 throw new RecordingException("Error querying workflow.name: ", e); 1260 } 1261 1262 //_debug("getWorkflowId idStr = " + idStr + " found id = " + retval); 1263 //_debug(_psWorkflowQuery); 1264 1265 return retval; 1266 } 1267 1268 /** Reset when we use a different db connection. */ 1269 protected void _dbReset() throws RecordingException 1270 { 1271 _wfId = RegEntity.UNKNOWN_ID; 1272 _maxDataSize = -1; 1273 _needReconnectWF = true; 1274 _needReconnectDB = true; 1275 1276 _wfReset(); 1277 1278 if(_dbType != null) 1279 { 1280 try 1281 { 1282 _dbType.disconnect(); 1283 } 1284 catch(SQLException e) 1285 { 1286 throw new RecordingException("Error disconnection from " + 1287 "database.", e); 1288 } 1289 } 1290 } 1291 1292 /** Reset when we use a different workflow. */ 1293 protected void _wfReset() 1294 { 1295 _wfExecId = RegEntity.UNKNOWN_ID; 1296 _entityCacheTable.clear(); 1297 _fireStateTable.clear(); 1298 } 1299 1300 /** Reset when we encounter an error. */ 1301 protected void _errorReset() throws RecordingException 1302 { 1303 _dbReset(); 1304 } 1305 1306 /** 1307 * This method opens the database connection if necessary. 1308 * @param allowReconnectWF - default true, only false if you want to force no _reconnectWF, e.g 1309 * for non-write events like delete 1310 */ 1311 protected void _checkConnection(boolean allowReconnectWF) throws RecordingException 1312 { 1313 // see if we need to (re)connect to DB 1314 if(_needReconnectDB || _needReconnectWF) 1315 { 1316 //_debug("in checkConnection: need to reconnectdb or wf"); 1317 _updateContainerName(); 1318 1319 try 1320 { 1321 _checkParameters(); 1322 1323 if(_schema == null) 1324 { 1325 _schema = _createSchema(); 1326 } 1327 1328 if(_needReconnectDB) 1329 { 1330 _reconnectDatabase(true); 1331 } 1332 1333 if(_needReconnectWF && allowReconnectWF) 1334 { 1335 _reconnectWorkflow(); 1336 } 1337 } 1338 catch(IllegalActionException e) 1339 { 1340 throw new RecordingException(_getExceptionMessage(e)); 1341 } 1342 } 1343 1344 } 1345 1346 /** Modify a NamedObj's full name. */ 1347 protected String _changeEntityFullName(String name) 1348 { 1349 return name; 1350 } 1351 1352 /** Check if a NamedObj has been inserted into the entity table. 1353 * 1354 * @param fullName fully qualified name of NamedObj. 1355 * @param displayName display name (NOTE: can be null) 1356 * @param type type of NamedObj. (NOTE: can be null) 1357 * @return 1358 */ 1359 protected RegEntity _checkEntity(Nameable namedObj, RegEntity.EntityType type) 1360 throws RecordingException 1361 { 1362 RegEntity retval = null; 1363 1364 String fullName = _getNameableFullName(namedObj); 1365 String displayName = namedObj.getDisplayName(); 1366 1367 _checkConnection(true); 1368 1369 String changedName = _changeEntityFullName(fullName); 1370 1371 String typeStr = type.toString(); 1372 1373 //_debug("getting from cache: " + cacheName); 1374 retval = _entityCacheTable.get(namedObj); 1375 1376 // see if it's cached 1377 if(retval != null) 1378 { 1379 //_debug("found existing id for " + _getNameableFullName(namedObj) + " : " + retval.getId()); 1380 1381 // it was cached, so no longer new. 1382 retval.setOld(); 1383 } 1384 else 1385 { 1386 try 1387 { 1388 // determine containing id 1389 int cntId = _getContainerId(namedObj); 1390 1391 // see if name is in entity table 1392 synchronized(_psEntityQuery) 1393 { 1394 _psEntityQuery.setInt(1, _wfId); 1395 _psEntityQuery.setString(2, changedName); 1396 _psEntityQuery.setString(3, typeStr); 1397 ResultSet rs = _psEntityQuery.executeQuery(); 1398 1399 boolean haveResult = rs.next(); 1400 1401 if(!haveResult) 1402 { 1403 // not in entity table, so insert 1404 retval = _addEntity(cntId, type, changedName, displayName, 1405 RegEntity.UNKNOWN_ID); 1406 1407 //_debug("added entity " + changedName + " id = " + retval.getId()); 1408 } 1409 else if(haveResult) 1410 { 1411 retval = new RegEntity(rs.getInt("id"), false, cntId, 1412 type); 1413 1414 //_debug("queried entity " + changedName + " id = " + retval.getId()); 1415 } 1416 1417 rs.close(); 1418 } 1419 } 1420 catch(SQLException e) 1421 { 1422 throw new RecordingException("ERROR querying entity: ", e); 1423 } 1424 1425 1426 if(retval == null) 1427 { 1428 //throw new RecordingException("Entity " + name + 1429 // " not registered."); 1430 } 1431 else 1432 { 1433 //_debug("caching " + cacheName); 1434 _entityCacheTable.put(namedObj, retval); 1435 } 1436 } 1437 1438 return retval; 1439 } 1440 1441 /** Add a new row to the entity table. */ 1442 protected RegEntity _addEntity(int containerId, RegEntity.EntityType type, 1443 String fullName, String displayName, int prevId) 1444 throws RecordingException, SQLException 1445 { 1446 1447 int id = RegEntity.UNKNOWN_ID; 1448 1449 synchronized(_psEntityInsert) 1450 { 1451 _psEntityInsert.setInt(1, containerId); 1452 _psEntityInsert.setInt(2, _wfId); 1453 _psEntityInsert.setString(3, type.toString()); 1454 _psEntityInsert.setString(4, fullName); 1455 id = _dbType.insert(_psEntityInsert, "entity", "id"); 1456 } 1457 1458 RegEntity retval = new RegEntity(id, true, containerId, type); 1459 1460 //_debug("inserted " + fullName + ", id = " + id); 1461 1462 if(_debugWriter != null) 1463 { 1464 _debugWrite("INSERT INTO ENTITY(" + containerId + ", " + _wfId + 1465 ", " + type + ", " + fullName + ")"); 1466 } 1467 1468 return retval; 1469 } 1470 1471 /** Add a new row to the workflow table. */ 1472 protected void _addWorkflow() throws RecordingException 1473 { 1474 try 1475 { 1476 // insert the new action table row 1477 synchronized(_psActionInsert) 1478 { 1479 _psActionInsert.setInt(1, 0); 1480 _psActionInsert.setString(2, _wfUserStr); 1481 _psActionInsert.setTimestamp(3, new Timestamp(new Date().getTime())); 1482 int actionId = _dbType.insert(_psActionInsert, "action", "id"); 1483 1484 if(_debugWriter != null) 1485 { 1486 _debugWrite("INSERT INTO ACTION(0, " + _wfUserStr + 1487 ", curTime)"); 1488 } 1489 1490 //XXX should psWorkflowInsert be synchronized? 1491 1492 // insert the new workflow table row 1493 _psWorkflowInsert.setString(1, _wfNameStr); 1494 _psWorkflowInsert.setString(2, "v0"); 1495 _psWorkflowInsert.setInt(3, actionId); 1496 _wfId = _dbType.insert(_psWorkflowInsert, "workflow","id"); 1497 _wfReset(); 1498 1499 if(_debugWriter != null) 1500 { 1501 _debugWrite("INSERT INTO WORKFLOW(" + _wfNameStr + 1502 ", v0, " + actionId + ")"); 1503 } 1504 } 1505 } 1506 catch(SQLException e) 1507 { 1508 throw new RecordingException("Error adding row to workflow:", e); 1509 } 1510 } 1511 1512 /** Find the container id of an entity. If entity has no container, i.e., 1513 * is the top-level object, returns 0. Otherwise if the container could 1514 * not be found, returns RegEntity.UNKNOWN_ID. */ 1515 protected int _getContainerId(Nameable namedObj) 1516 throws RecordingException 1517 { 1518 //_debug("_getContainerId(" + fullName + ")"); 1519 1520 int retval = RegEntity.UNKNOWN_ID; 1521 1522 NamedObj container = namedObj.getContainer(); 1523 1524 if(container == null) 1525 { 1526 retval = 0; 1527 } 1528 else 1529 { 1530 RegEntity.EntityType containerType = 1531 RegEntity.EntityType.getType(container); 1532 RegEntity re = _checkEntity(container, containerType); 1533 if(re == null) 1534 { 1535 //throw new RecordingException("Container for " + name + 1536 // " is not in entity table."); 1537 retval = RegEntity.UNKNOWN_ID; 1538 } 1539 else 1540 { 1541 retval = re.getId(); 1542 } 1543 } 1544 1545 //_debug("getContainerId " + retval + " for " + name); 1546 1547 return retval; 1548 } 1549 1550 /** Convenience routine that regActor and regDirector can use since 1551 * the schema for actors and directors only differs by "actor" vs 1552 * "director". 1553 */ 1554 protected int _regActorDirector(NamedObj namedObj, boolean actor) throws RecordingException 1555 { 1556 RegEntity.EntityType type = actor ? 1557 RegEntity.EntityType.Actor : RegEntity.EntityType.Director; 1558 RegEntity re = _checkEntity(namedObj, type); 1559 if(re.isNew()) 1560 { 1561 1562 //_debug("is new actor " + namedObj.getFullName()); 1563 1564 PreparedStatement ps = null; 1565 1566 if(actor) 1567 { 1568 ps = _psActorInsert; 1569 } 1570 else 1571 { 1572 ps = _psDirectorInsert; 1573 } 1574 1575 try 1576 { 1577 String className = namedObj.getClassName(); 1578 1579 synchronized(ps) 1580 { 1581 ps.setInt(1, re.getId()); 1582 ps.setString(2, className); 1583 ps.executeUpdate(); 1584 } 1585 1586 if(_debugWriter != null) 1587 { 1588 String table = actor ? "ACTOR" : "DIRECTOR"; 1589 _debugWrite("INSERT INTO " + table + " (" + 1590 re.getId() + ", " + className + ")"); 1591 } 1592 1593 } 1594 catch(SQLException e) 1595 { 1596 String msg = "Error registering "; 1597 if(actor) 1598 { 1599 msg += "actor: "; 1600 } 1601 else 1602 { 1603 msg += "director: "; 1604 } 1605 1606 if(_debugWriter != null) 1607 { 1608 _debugWrite(msg, e); 1609 } 1610 1611 throw new RecordingException(msg, e); 1612 } 1613 } 1614 1615 return re.getId(); 1616 } 1617 1618 /** Add a port to the port table. */ 1619 protected void _regPortReal(TypedIOPort port, RegEntity re) 1620 throws RecordingException 1621 { 1622 int multi = port.isMultiport() ? 1 : 0; 1623 1624 int direction; 1625 if(port.isInput() && port.isOutput()) 1626 { 1627 direction = PortDirection.InOut.ordinal(); 1628 } 1629 else if(port.isInput()) 1630 { 1631 direction = PortDirection.In.ordinal(); 1632 } 1633 else 1634 { 1635 direction = PortDirection.Out.ordinal(); 1636 } 1637 1638 try 1639 { 1640 synchronized(_psPortInsert) 1641 { 1642 _psPortInsert.setInt(1, re.getId()); 1643 _psPortInsert.setString(2, port.getType().toString()); 1644 _psPortInsert.setInt(3, direction); 1645 _psPortInsert.setInt(4, multi); 1646 _psPortInsert.executeUpdate(); 1647 } 1648 1649 if(_debugWriter != null) 1650 { 1651 _debugWrite("INSERT INTO PORT (id, type, " + direction + 1652 ", " + multi + ")"); 1653 } 1654 1655 } 1656 catch(SQLException e) 1657 { 1658 String msg = "Error inserting port " + _getNameableFullName(port) + ": "; 1659 _debugWrite(msg, e); 1660 throw new RecordingException(msg, e); 1661 } 1662 } 1663 1664 /** Add an entity to the parameter table. */ 1665 protected void _regParameterReal(NamedObj parameter, RegEntity re) 1666 throws RecordingException 1667 { 1668 String className = parameter.getClassName(); 1669 String valueStr = "none"; 1670 1671 try 1672 { 1673 synchronized(_psParameterInsert) 1674 { 1675 //_debug("going to insert parameter " + _getNameableFullName(parameter) + " with id " + re.getId()); 1676 1677 _psParameterInsert.setInt(1, re.getId()); 1678 _psParameterInsert.setString(2, className); 1679 1680 if(parameter instanceof AbstractSettableAttribute) 1681 { 1682 valueStr = ((AbstractSettableAttribute)parameter). 1683 getValueAsString(); 1684 } 1685 1686 // replace null string with empty string. 1687 if(valueStr == null) 1688 { 1689 valueStr = ""; 1690 } 1691 1692 //_debug(_getNameableFullName(parameter) + " value length = " + valueStr.length()); 1693 1694 // XXX need to set truncated bit somewhere 1695 if(valueStr.length() > _maxParameterValueSize) 1696 { 1697 //_debug("TRUNCATING!"); 1698 valueStr = valueStr.substring(0, _maxParameterValueSize); 1699 } 1700 1701 _psParameterInsert.setString(3, valueStr); 1702 1703 _psParameterInsert.executeUpdate(); 1704 } 1705 1706 if(_debugWriter != null) 1707 { 1708 if(className.equals("org.kepler.moml.NamedObjId")) 1709 { 1710 _debugWrite("INSERT INTO PARAMETER (" + re.getId() + 1711 ", " + className + ", lsid)"); 1712 } 1713 else 1714 { 1715 _debugWrite("INSERT INTO PARAMETER (" + re.getId() + 1716 ", " + className + ", " + valueStr + ")"); 1717 } 1718 } 1719 } 1720 catch(SQLException e) 1721 { 1722 String msg = "Error registering parameter " + _getNameableFullName(parameter) + ": "; 1723 _debugWrite(msg, e); 1724 throw new RecordingException(msg, e); 1725 } 1726 } 1727 1728 /** Record a specific type of firing for an actor. */ 1729 protected void _recordFiringEvent(Actor actor, 1730 FiringEvent.FiringEventType type, Date timestamp) throws RecordingException 1731 { 1732 //_debug("firing event: " + type + " for " + _getNameableFullName(actor)); 1733 1734 // see if it's a start or stop. 1735 FireState<Integer> fs = _fireStateTable.get(actor); 1736 1737 if(fs == null) 1738 { 1739 throw new RecordingException( 1740 "Received actor fire event for unregistered actor: " + 1741 _getNameableFullName(actor)); 1742 } 1743 1744 synchronized(fs) 1745 { 1746 try 1747 { 1748 if(type.isStart()) 1749 { 1750 int fireId; 1751 1752 synchronized(_psActorFireStart) 1753 { 1754 _psActorFireStart.setInt(1, fs.getActorId()); 1755 _psActorFireStart.setInt(2, _wfExecId); 1756 _psActorFireStart.setTimestamp(3,new Timestamp(timestamp.getTime())); 1757 _psActorFireStart.setString(4, type.getTypeName()); 1758 fireId = _dbType.insert(_psActorFireStart, "actor_fire", 1759 "id"); 1760 } 1761 1762 if(_debugWriter != null) 1763 { 1764 _debugWrite("INSERT INTO ACTOR_FIRE(" + 1765 fs.getActorId() + ", " + _wfExecId + ", " + 1766 "curTime, " + type.getTypeName() + ")"); 1767 } 1768 1769 fs.fireStart(type, fireId); 1770 } 1771 else 1772 { 1773 Integer fireId = fs.fireStop(type); 1774 1775 if(fireId == null) 1776 { 1777 String msg = "fire id is null for " + _getNameableFullName(actor); 1778 _debugWrite(msg); 1779 throw new RecordingException(msg); 1780 } 1781 1782 synchronized(_psActorFireStop) 1783 { 1784 _psActorFireStop.setTimestamp(1, new Timestamp(timestamp.getTime())); 1785 _psActorFireStop.setInt(2, fireId); 1786 _psActorFireStop.executeUpdate(); 1787 } 1788 1789 if(_debugWriter != null) 1790 { 1791 _debugWrite("UPDATE ACTOR_FIRE set end_time " + 1792 "= curTime WHERE id = " + fireId + ")"); 1793 } 1794 } 1795 } 1796 catch(SQLException e) 1797 { 1798 throw new RecordingException(_getExceptionMessage(e)); 1799 } 1800 } 1801 } 1802 1803 /** Record a port read or write to the token_flow table. */ 1804 protected void _recordPortEvent(IOPort port, int fireId, int rwfireId, 1805 boolean isRead, int channel, Token token, IOPort destPort, Date timestamp) 1806 throws RecordingException 1807 { 1808 1809 String dataStr; 1810 int dataDesc; 1811 1812 String data = token.toString(); 1813 1814 // see if we have data 1815 /* FindBugs: data is known to be non-null 1816 if(data == null) 1817 { 1818 // we are not recording data 1819 dataStr = ""; 1820 dataDesc = DATA_NONE; 1821 } 1822 // see if the data is too large 1823 else*/ if(data.length() > _maxDataSize) 1824 { 1825 // store as much data as possible 1826 dataStr = data.substring(0, _maxDataSize); 1827 dataDesc = DATA_TRUNCATED; 1828 } 1829 else 1830 { 1831 // data not too large 1832 dataStr = data; 1833 dataDesc = DATA_VALID; 1834 } 1835 1836 RegEntity.EntityType type; 1837 if(port instanceof ParameterPort) 1838 { 1839 type = RegEntity.EntityType.PortParameter; 1840 } 1841 else 1842 { 1843 type = RegEntity.EntityType.Port; 1844 } 1845 1846 RegEntity re = _checkEntity(port, type); 1847 if(re == null) 1848 { 1849 throw new RecordingException("Port has not been registered: " + 1850 _getNameableFullName(port)); 1851 } 1852 1853 int portId = re.getId(); 1854 1855 try 1856 { 1857 synchronized(_psTokenFlowInsert) 1858 { 1859 _psTokenFlowInsert.setInt(1, portId); 1860 _psTokenFlowInsert.setTimestamp(2, new Timestamp(timestamp.getTime())); 1861 _psTokenFlowInsert.setString(3, dataStr); 1862 _psTokenFlowInsert.setInt(4, channel); 1863 1864 if(isRead) 1865 { 1866 _psTokenFlowInsert.setInt(5, 1); 1867 } 1868 else 1869 { 1870 _psTokenFlowInsert.setInt(5, 0); 1871 } 1872 1873 _psTokenFlowInsert.setInt(6, fireId); 1874 _psTokenFlowInsert.setInt(7, rwfireId); 1875 _psTokenFlowInsert.setInt(8, dataDesc); 1876 _psTokenFlowInsert.executeUpdate(); 1877 } 1878 1879 if(_debugWriter != null) 1880 { 1881 _debugWrite("INSERT INTO TOKEN_FLOW(" + portId + 1882 ", curTime, " + dataStr + ", " + channel + ", " + 1883 isRead + ", " + fireId + ", " + rwfireId + ", " + 1884 dataDesc + ")"); 1885 } 1886 1887 } 1888 catch(SQLException e) 1889 { 1890 _error("port = " + _getNameableFullName(port) + ", " + 1891 "portId = " + portId + ", " + 1892 "fireId = " + fireId + ", " + 1893 "rwFireId = " + rwfireId + ", " + 1894 "isRead = " + isRead + ", " + 1895 "channel = " + channel + ", " + 1896 "data = " + data); 1897 e.printStackTrace(); 1898 _error("------------------"); 1899 throw new RecordingException(_getExceptionMessage(e)); 1900 } 1901 } 1902 1903 //////////////////////////////////////////////////////////////////////// 1904 //// protected variables //// 1905 1906 /** The schema containing table and column definitions. */ 1907 protected Schema _schema; 1908 1909 /** The current workflow id. */ 1910 protected int _wfId; 1911 1912 /** The current workflow execution id. */ 1913 protected int _wfExecId; 1914 1915 /** The maximum size in the SQL table for data values. */ 1916 protected int _maxDataSize; 1917 1918 /** The maximum size in the SQL table for parameter values. */ 1919 protected int _maxParameterValueSize; 1920 1921 /** A table to map actor to its firing state object. */ 1922 protected Map<Actor, FireState<Integer>> _fireStateTable = null; 1923 1924 /** A table to cache RegEntity objects. */ 1925 protected Map<Nameable,RegEntity> _entityCacheTable = null; 1926 1927 protected PreparedStatement _psWorkflowInsert; 1928 protected PreparedStatement _psWorkflowQuery; 1929 protected PreparedStatement _psActionInsert; 1930 protected PreparedStatement _psEntityInsert; 1931 protected PreparedStatement _psEntityQuery; 1932 protected PreparedStatement _psActorInsert; 1933 protected PreparedStatement _psDirectorInsert; 1934 protected PreparedStatement _psParameterInsert; 1935 protected PreparedStatement _psLinkInsert; 1936 protected PreparedStatement _psPortInsert; 1937 protected PreparedStatement _psRelationInsert; 1938 protected PreparedStatement _psWorkflowExecStart; 1939 protected PreparedStatement _psWorkflowExecStop; 1940 protected PreparedStatement _psActorFireStart; 1941 protected PreparedStatement _psActorFireStop; 1942 protected PreparedStatement _psTokenFlowInsert; 1943 1944 protected String _wfNameStr = null; 1945 protected String _wfUserStr = "unknown"; 1946 1947 /** The follow hold values from DatabaseFactory.Parameters. 1948 * NOTE: use a LinkedHashMap for predictable iteration order. 1949 */ 1950 protected Map<String,String> _dbParams = new LinkedHashMap<String,String>(); 1951 1952 /** Connection to the database. */ 1953 protected DatabaseType _dbType; 1954 1955 // return values for connect 1956 protected static final int SUCCESS = 0; 1957 protected static final int WORKFLOW_ALREADY_EXISTS = 1; 1958 protected static final int WORKFLOW_DOES_NOT_EXIST = 2; 1959 1960 /** The sql parameters object. */ 1961 protected SQLRecordingParameters _params = null; 1962 1963 /** True when we need to reconnect to the database. */ 1964 protected boolean _needReconnectDB = true; 1965 1966 /** True when we need to load workflow id from database. */ 1967 protected boolean _needReconnectWF = true; 1968}