001/* SQL provenance output to v8 schema. 002 003Copyright (c) 2008-2010 The Regents of the University of California. 004All rights reserved. 005Permission is hereby granted, without written agreement and without 006license or royalty fees, to use, copy, modify, and distribute this 007software and its documentation for any purpose, provided that the above 008copyright notice and the following two paragraphs appear in all copies 009of this software. 010 011IN NO EVENT SHALL THE UNIVERSITY OF CALIFORNIA BE LIABLE TO ANY PARTY 012FOR DIRECT, INDIRECT, SPECIAL, INCIDENTAL, OR CONSEQUENTIAL DAMAGES 013ARISING OUT OF THE USE OF THIS SOFTWARE AND ITS DOCUMENTATION, EVEN IF 014THE UNIVERSITY OF CALIFORNIA HAS BEEN ADVISED OF THE POSSIBILITY OF 015SUCH DAMAGE. 016 017THE UNIVERSITY OF CALIFORNIA SPECIFICALLY DISCLAIMS ANY WARRANTIES, 018INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF 019MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE. THE SOFTWARE 020PROVIDED HEREUNDER IS ON AN "AS IS" BASIS, AND THE UNIVERSITY OF 021CALIFORNIA HAS NO OBLIGATION TO PROVIDE MAINTENANCE, SUPPORT, UPDATES, 022ENHANCEMENTS, OR MODIFICATIONS. 023 024*/ 025 026package org.kepler.provenance.sql; 027 028import java.io.BufferedInputStream; 029import java.io.File; 030import java.io.FileInputStream; 031import java.io.FileNotFoundException; 032import java.io.FileOutputStream; 033import java.io.IOException; 034import java.io.InputStream; 035import java.io.OutputStream; 036import java.lang.ref.WeakReference; 037import java.math.BigInteger; 038import java.security.MessageDigest; 039import java.security.NoSuchAlgorithmException; 040import java.sql.PreparedStatement; 041import java.sql.ResultSet; 042import java.sql.SQLException; 043import java.sql.Timestamp; 044import java.util.ArrayList; 045import java.util.Collections; 046import java.util.Date; 047import java.util.HashMap; 048import java.util.HashSet; 049import java.util.Iterator; 050import java.util.LinkedList; 051import java.util.List; 052import java.util.Map; 053import java.util.Set; 054import java.util.WeakHashMap; 055import java.util.jar.JarEntry; 056import java.util.regex.Matcher; 057 058import org.kepler.kar.KAREntry; 059import org.kepler.kar.KARFile; 060import org.kepler.kar.ModuleDependencyUtil; 061import org.kepler.moml.NamedObjId; 062import org.kepler.moml.NamedObjIdReferralList; 063import org.kepler.objectmanager.lsid.KeplerLSID; 064import org.kepler.provenance.FireState; 065import org.kepler.provenance.IOPortRefillEvent; 066import org.kepler.provenance.PortConnector; 067import org.kepler.provenance.ProvenanceEvent; 068import org.kepler.provenance.QueryException; 069import org.kepler.provenance.Queryable; 070import org.kepler.provenance.RecordingException; 071import org.kepler.provenance.RecordingParameters; 072import org.kepler.sms.NamedOntClass; 073import org.kepler.sms.SMSServices; 074import org.kepler.sms.SemanticType; 075import org.kepler.tagging.TagEvent; 076import org.kepler.util.FileUtil; 077import org.kepler.util.RenameUtil; 078import org.kepler.util.WorkflowRenameListener; 079import org.kepler.util.WorkflowRun; 080import org.kepler.util.sql.Schema; 081 082import ptolemy.actor.Actor; 083import ptolemy.actor.Director; 084import ptolemy.actor.FiringEvent; 085import ptolemy.actor.IOPort; 086import ptolemy.actor.IORelation; 087import ptolemy.actor.TypedIOPort; 088import ptolemy.actor.gui.EditorFactory; 089import ptolemy.actor.lib.hoc.IterateOverArray; 090import ptolemy.actor.parameters.ParameterPort; 091import ptolemy.actor.parameters.PortParameter; 092import ptolemy.actor.sched.Scheduler; 093import ptolemy.data.ArrayToken; 094import ptolemy.data.BooleanToken; 095import ptolemy.data.IntToken; 096import ptolemy.data.RecordToken; 097import ptolemy.data.StringToken; 098import ptolemy.data.Token; 099import ptolemy.data.expr.Parameter; 100import ptolemy.domains.sdf.kernel.SDFDirector; 101import ptolemy.domains.sdf.lib.SampleDelay; 102import ptolemy.kernel.attributes.VersionAttribute; 103import ptolemy.kernel.util.Attribute; 104import ptolemy.kernel.util.IllegalActionException; 105import ptolemy.kernel.util.NameDuplicationException; 106import ptolemy.kernel.util.Nameable; 107import ptolemy.kernel.util.NamedObj; 108import ptolemy.kernel.util.SingletonAttribute; 109import ptolemy.kernel.util.StringAttribute; 110import ptolemy.moml.MoMLParser; 111import ptolemy.util.MessageHandler; 112 113 114/** SQL provenance output. 115 * 116 * <b>NOTE: The v8 schema is under development and likely to change.</b> 117 * 118 * This uses version 8 of the schema. New features: 119 * <ul> 120 * <li>Saving the workflow contents for each execution.</li> 121 * <li>Saving contents of files referenced in tokens.</li> 122 * <li>Each workflow and workflow execution has unique LSID.</li> 123 * <li>Workflow name no longer required.</li> 124 * <li>Explicit mapping from token write to token read.</li> 125 * <li>Combining actor firing events into a single "firing cycle event".</li> 126 * <li>Recording execution exceptions.</li> 127 * <li>Storing associated data for executions.</li> 128 * </ul> 129 * 130 * @author Derik Barseghian, Daniel Crawl, Ben Leinfelder, Sean Riddle 131 * @version $Id: SQLRecordingV8.java 34597 2017-08-10 23:34:49Z crawl $ 132 * 133 */ 134 135public class SQLRecordingV8 extends SQLRecordingV7 136{ 137 public SQLRecordingV8() throws RecordingException 138 { 139 super(); 140 141 _portConnector = new PortConnector<TokenInfo>(); 142 143 _queryable = null; 144 145 // start with empty workflow name 146 _wfNameStr = ""; 147 148 _nextExecLSIDStr = null; 149 150 _executionLSIDtoIdMap = new WeakHashMap<KeplerLSID, Integer>(); 151 152 _v8Set.add(new WeakReference<SQLRecordingV8>(this)); 153 154 _executionHadAnError = false; 155 } 156 157 /** Associate the contents of a file with the most recent workflow execution. */ 158 @Override 159 public void addFileForLastExecution(Map<String,String> metadataMap, File file) 160 throws RecordingException 161 { 162 addFileForExecution(metadataMap, file, RegEntity.UNKNOWN_ID); 163 } 164 165 /** Associate the contents of a file with the most recent workflow execution. 166 * If execId == RegEntity.UNKNOWN_ID, use last execution. 167 */ 168 public void addFileForExecution(Map<String,String> metadataMap, File file, int execId) 169 throws RecordingException 170 { 171 int execIdToUse = execId; 172 173 if (execId == RegEntity.UNKNOWN_ID){ 174 if(_wfLastExecId == RegEntity.UNKNOWN_ID) 175 { 176 throw new RecordingException("No previous workflow execution."); 177 } 178 execIdToUse = _wfLastExecId; 179 } 180 181 if(metadataMap.size() == 0) 182 { 183 throw new RecordingException("Metadata map is empty."); 184 } 185 // add the file contents to data table if not already there 186 String fileMD5Str = _checkData(file); 187 188 try 189 { 190 synchronized(_psAssocDataInsert) 191 { 192 for(Map.Entry<String,String> entry : metadataMap.entrySet()) 193 { 194 _psAssocDataInsert.setString(1, fileMD5Str); 195 _psAssocDataInsert.setString(2, entry.getKey()); 196 _psAssocDataInsert.setString(3, entry.getValue()); 197 _psAssocDataInsert.setInt(4, execIdToUse); 198 _dbType.insert(_psAssocDataInsert, "associated_data", "id"); 199 200 if(_debugWriter != null) 201 { 202 _debugWrite("INSERT INTO ASSOCIATED_DATA (" + 203 fileMD5Str + ", " + entry.getKey() + ", " + 204 entry.getValue() + ", " + execIdToUse + ")"); 205 } 206 } 207 } 208 } 209 catch(SQLException e) 210 { 211 throw new RecordingException("Error trying to insert into " + 212 "associated_data table: " + e.getMessage()); 213 } 214 215 } 216 217 /** React to a change in an attribute. */ 218 @Override 219 public void attributeChanged(Attribute attribute) 220 throws IllegalActionException 221 { 222 boolean notifyParent = true; 223 224 String name = attribute.getName(); 225 226 // see if the workflow name has changed 227 if(name.equals(SQLRecordingParameters.wfNameParamStr)) 228 { 229 String val = 230 ((StringToken)((Parameter)attribute).getToken()).stringValue(); 231 232 try 233 { 234 // see if name changed and we're already connected to the db 235 if(!val.equals(_wfNameStr) && !_needReconnectWF) 236 { 237 // change the wf name in the column 238 _changeWorkflowNameColumn(val); 239 notifyParent = false; 240 } 241 242 } 243 catch(RecordingException e) 244 { 245 throw new IllegalActionException(e.getMessage()); 246 } 247 } 248 else if(name.equals(SQLRecordingParametersV8.maxFileIncludeSizeKBStr)) 249 { 250 notifyParent = false; 251 252 Token token = ((Parameter)attribute).getToken(); 253 if(token == null) 254 { 255 _maxFileIncludeSizeKBVal = _maxFileIncludeSizeKBDefaultVal; 256 } 257 else 258 { 259 _maxFileIncludeSizeKBVal = ((IntToken)token).intValue(); 260 } 261 } 262 else if(name.equals(SQLRecordingParametersV8.execAnnotationStr)) 263 { 264 notifyParent = false; 265 266 _execAnnotation = ((StringToken)((Parameter)attribute).getToken()).stringValue(); 267 } 268 else if(name.equals(SQLRecordingParametersV8.nextExecLSIDStr)) 269 { 270 notifyParent = false; 271 272 _nextExecLSIDStr = ((StringToken)((Parameter)attribute).getToken()).stringValue(); 273 } 274 else if(name.equals(SQLRecordingParametersV8.watchForLSIDChangesStr)) 275 { 276 notifyParent = false; 277 278 _watchForLSIDChanges = ((BooleanToken)((Parameter)attribute).getToken()).booleanValue(); 279 } 280 281 if(notifyParent) 282 { 283 super.attributeChanged(attribute); 284 } 285 } 286 287 /** Add Parameters for ProvenanceRecorder. */ 288 @Override 289 public RecordingParameters generateParameters(NamedObj no) 290 throws IllegalActionException, NameDuplicationException 291 { 292 _params = new SQLRecordingParametersV8(no); 293 return _params; 294 } 295 296 /** Register a port or portparameter. */ 297 @Override 298 public boolean regPort(TypedIOPort port) throws RecordingException 299 { 300 boolean retval = super.regPort(port); 301 302 _portConnector.createConnections(port); 303 304 return retval; 305 } 306 307 /** Record the starting of workflow execution. */ 308 @Override 309 public void executionStart(KeplerLSID executionLSID, Date timestamp) throws RecordingException 310 { 311 //System.out.println("execution start"); 312 313 if(_wfExecId != RegEntity.UNKNOWN_ID) 314 { 315 throw new RecordingException("Workflow already running."); 316 } 317 318 // NOTE: do not set this to false so that workflow contents 319 // will be registered again. this handles cases where entities 320 // are renamed between runs. 321 //_needWorkflowContents(false); 322 323 if(_wfUserStr == null) 324 { 325 throw new RecordingException("Need workflow user name."); 326 } 327 328 if(_machineStr == null) 329 { 330 throw new RecordingException("Need machine name."); 331 } 332 333 // update the db 334 try 335 { 336 byte[] wfBytes = _recorderContainer.exportMoML().getBytes(); 337 String wfMD5Str = _checkData(wfBytes); 338 339 String annotationStr; 340 if(_execAnnotation != null) 341 { 342 annotationStr = _execAnnotation; 343 } 344 else 345 { 346 annotationStr = ""; 347 } 348 349 // if _nextExecLSID defined, we should use it instead of runLSID 350 if(_nextExecLSIDStr != null && _nextExecLSIDStr.length() != 0) 351 { 352 try 353 { 354 executionLSID = new KeplerLSID(_nextExecLSIDStr); 355 } 356 catch(Exception e) 357 { 358 throw new RecordingException("Error converting given LSID " + 359 _nextExecLSIDStr + " to LSID: ", e); 360 } 361 362 // clear it so that we don't use it again. 363 _nextExecLSIDStr = null; 364 } 365 366 // get the workflow lsid 367 KeplerLSID wfLSID = NamedObjId.getIdFor(_recorderContainer.toplevel()); 368 369 synchronized(_psWorkflowExecStart) 370 { 371 _psWorkflowExecStart.setInt(1, _wfId); 372 _psWorkflowExecStart.setString(2, _wfUserStr); 373 _psWorkflowExecStart.setTimestamp(3, new Timestamp(timestamp.getTime())); 374 _psWorkflowExecStart.setString(4, wfMD5Str); 375 _psWorkflowExecStart.setString(5, _machineStr); 376 _psWorkflowExecStart.setString(6, annotationStr); 377 _psWorkflowExecStart.setString(7, executionLSID.toString()); 378 _psWorkflowExecStart.setString(8, wfLSID.toString()); 379 _psWorkflowExecStart.setString(9, ModuleDependencyUtil. 380 buildModuleDependenciesString()); 381 _psWorkflowExecStart.setString(10, WorkflowRun.type.Running.toString()); 382 _wfExecId = _dbType.insert(_psWorkflowExecStart, 383 "workflow_exec", "id"); 384 } 385 386 _executionLSIDtoIdMap.put(executionLSID, _wfExecId); 387 388 //insert any workflow tags here 389 insertWorkflowTags(_wfExecId); 390 391 if(_debugWriter != null) 392 { 393 _debugWrite("INSERT INTO WORKFLOW_EXEC (" + _wfId + 394 ", " + _wfUserStr + ", curTime, md5, " + _machineStr + 395 ", " + annotationStr + ", lsid)"); 396 } 397 398 _updateParameterExecTable(); 399 400 } 401 catch(SQLException e) 402 { 403 throw new RecordingException(_getExceptionMessage(e)); 404 } 405 } 406 407 /** An actor threw an exception. */ 408 @Override 409 public void executionError(Nameable source, Throwable throwable, KeplerLSID executionLSID) 410 throws RecordingException 411 { 412 413 int entityId = RegEntity.UNKNOWN_ID; 414 if(source != null) 415 { 416 RegEntity entity = _entityCacheTable.get(source); 417 418 // XXX what if null? 419 if(entity != null) 420 { 421 entityId = entity.getId(); 422 } 423 } 424 425 Integer execId = _executionLSIDtoIdMap.get(executionLSID); 426 if(execId == null) 427 { 428 throw new RecordingException("No id found for execution " + executionLSID); 429 } 430 431 String message = throwable.getMessage(); 432 int maxLength = 433 _schema.getTable("error").getColumn("message").getLength(); 434 435 // if there is a message and it is larger than the column, 436 // truncate it. 437 if(message != null && message.length() > maxLength) 438 { 439 message = message.substring(0, maxLength); 440 } 441 442 try 443 { 444 synchronized(_psErrorInsert) 445 { 446 _psErrorInsert.setInt(1, entityId); 447 _psErrorInsert.setInt(2, execId); 448 _psErrorInsert.setString(3, message); 449 _dbType.insert(_psErrorInsert, "error", "id"); 450 451 if(_debugWriter != null) 452 { 453 _debugWrite("INSERT INTO ERROR(" + entityId + ", " + 454 execId + ", " + throwable.getMessage() + ")"); 455 } 456 } 457 } 458 catch(SQLException e) 459 { 460 throw new RecordingException("Error inserting into error table: ", 461 e); 462 } 463 changeExecutionType(executionLSID, WorkflowRun.type.Error); 464 _executionHadAnError = true; 465 } 466 467 /** Record the stopping of workflow execution. */ 468 @Override 469 public void executionStop(KeplerLSID executionLSID, Date timestamp) throws RecordingException 470 { 471 // save the execution id. 472 _wfLastExecId = _wfExecId; 473 if (!_executionHadAnError) 474 { 475 changeExecutionType(executionLSID, WorkflowRun.type.Complete); 476 } 477 478 super.executionStop(timestamp); 479 } 480 481 /** Returns true if workflow contents should be registered. */ 482 @Override 483 public boolean regContents() throws RecordingException 484 { 485 // see if we're watching for workflow LSID changes and 486 // the workflow LSID is not null 487 if(_watchForLSIDChanges && _wfLSID != null) 488 { 489 // see if the workflow LSID has changed. 490 KeplerLSID lsid = NamedObjId.getIdFor(_recorderContainer); 491 if(! lsid.equalsWithoutRevision(_wfLSID)) 492 { 493 //_debug("regContents: lsid appears to have changed."); 494 //_debug(" " + _wfLSID + " --> " + lsid); 495 496 // reconnect to the workflow so that we get the new LSID 497 _wfLSID = null; 498 _reconnectWorkflow(); 499 500 // return true so the workflow contents get registered. 501 return true; 502 } 503 } 504 505 return super.regContents(); 506 } 507 508 /** Register a link between two endpoints. */ 509 @Override 510 public boolean regLink(NamedObj endPoint1, NamedObj endPoint2) 511 throws RecordingException 512 { 513 // do nothing 514 return false; 515 } 516 517 /** Register a parameter. A parameter can be any <b>entity</b> 518 * stored in the MoML that does not have its own 519 * <code>regNNN()</code> method. This can be user-level 520 * parameters (e.g., Parameter, StringParameter, etc.) or 521 * internal to Kepler (e.g., _location, semanticType000, etc.). 522 * (A "parameter" corresponds to a property in the MoML). 523 * 524 */ 525 @Override 526 public boolean regParameter(NamedObj parameter) throws RecordingException 527 { 528 boolean skip = false; 529 530 String name = parameter.getName(); 531 NamedObj container = parameter.getContainer(); 532 533 // filter out certain types of parameters 534 if(parameter instanceof EditorFactory || 535 parameter instanceof Scheduler || 536 parameter instanceof SemanticType || 537 parameter instanceof SingletonAttribute || 538 parameter instanceof VersionAttribute) 539 { 540 skip = true; 541 } 542 // do not record class parameters since the 543 // type is already recorded and the class lsid is not used. 544 else if(name.equals("class") && 545 parameter instanceof StringAttribute) 546 { 547 String val = ((StringAttribute)parameter).getExpression(); 548 String containerClass = 549 parameter.getContainer().getClass().getName(); 550 if(containerClass.equals(val)) 551 { 552 skip = true; 553 } 554 } 555 // do not record special attributes for ports 556 else if((container instanceof IOPort) && 557 (name.equals("tokenConsumptionRate") || 558 name.equals("tokenProductionRate") || 559 name.equals("_showName") || 560 name.equals("_hide"))) 561 { 562 skip = true; 563 } 564 // do not record internal parameter iterationCount in IterateOverArray 565 else if((container instanceof IterateOverArray) && 566 name.equals("iterationCount")) { 567 skip = true; 568 } 569 // do not record AUTO and UNBOUNDED in SDF 570 else if((container instanceof SDFDirector) && 571 (name.equals(SDFDirector.AUTO_NAME) || 572 name.equals(SDFDirector.UNBOUNDED_NAME))) { 573 skip = true; 574 } 575 // do not record start and stop time in Director 576 else if((container instanceof Director) && 577 (name.equals("startTime") || 578 name.equals("stopTime"))) { 579 skip = true; 580 } 581 582 583 if(skip) 584 { 585 return false; 586 } 587 else 588 { 589 return super.regParameter(parameter); 590 } 591 } 592 593 /** Register a relation. */ 594 @Override 595 public boolean regRelation(IORelation relation) throws RecordingException 596 { 597 // do nothing. 598 return false; 599 } 600 601 /** Record a port refill event. */ 602 @Override 603 public void refillPortEvent(IOPortRefillEvent event) throws RecordingException 604 { 605 //System.out.println("got refill event: " + event); 606 _portConnector.refillId(event.getPort(), event.getChannel()); 607 } 608 609 /** Get a Queryable connected to the Recording output. 610 * @param allowReconnectWF - false if you want to force no _reconnectWF 611 * @exception QueryException may be thrown if Queryable not implemented 612 * for the Recording type. 613 * @throws RecordingException 614 */ 615 @Override 616 public Queryable getQueryable(boolean allowReconnectWF) throws QueryException, RecordingException 617 { 618 // make sure we're connected to db 619 _checkConnection(allowReconnectWF); 620 621 return _queryable; 622 } 623 624 /** A workflow was renamed. 625 * 626 * @param namedObj the workflow 627 * @param oldLSID the previous LSID 628 * @param newLSID the new LSID 629 * @param oldName the previous name 630 * @param newName the new name 631 * @throws RecordingException 632 * @see WorkflowRenameListener 633 */ 634 @Override 635 public void renamedWorkflow(NamedObj namedObj, KeplerLSID oldLSID, 636 KeplerLSID newLSID, String oldName, String newName) throws RecordingException 637 { 638 639 _checkConnection(true); 640 641 // make sure we're connected to the database 642 if(!_needReconnectDB && !_needReconnectWF) 643 { 644 Matcher matcher = RenameUtil.unnamedIdPattern.matcher(oldName); 645 646 // make sure that the id matches the unnamed regex. 647 if(oldName != null) 648 { 649 650 if (newName != null) 651 { 652 if (matcher.matches()) 653 { 654 try 655 { 656 _changeWorkflowNameColumn(newName, oldLSID); 657 } 658 catch (RecordingException e) 659 { 660 MessageHandler.error("Error changing workflow name.", e); 661 } 662 } 663 else 664 { 665 // just change this workflow name 666 _wfNameStr = newName; 667 668 // note: 669 // we call _changeWorkflowNameColumn, but with newLSID, 670 // because workflow has already been recorded at this point. 671 // An alternative might be to check for NamedObjIdChangeRequest 672 // in provenanceRecorder changeExecuted and somehow do 673 // the rename then, before _recordWorkflowContents. 674 try 675 { 676 _changeWorkflowNameColumn(newName, newLSID); 677 } 678 catch (RecordingException e) 679 { 680 MessageHandler.error("Error changing workflow name.", e); 681 } 682 } 683 } 684 } 685 } 686 } 687 688 /** Delete executions given a list of execution LSIDs. 689 * 690 * @param lsidList 691 * @return 692 * @throws RecordingException 693 */ 694 public int deleteExecutions(List<KeplerLSID> lsidList) throws RecordingException 695 { 696 697 // make sure we're connected to db 698 _checkConnection(false); 699 700 int numRowsDeleted = 0; 701 702 String wfExecStr = _dbType.getTableName("workflow_exec"); 703 String dataStr = _dbType.getTableName("data"); 704 705 StringBuilder lsids = new StringBuilder(); 706 for (Iterator<KeplerLSID> i=lsidList.iterator(); i.hasNext(); ) 707 { 708 lsids.append("'"+i.next().toString()+"'"); 709 if(i.hasNext()) 710 { 711 lsids.append(","); 712 } 713 } 714 715 // delete the executions 716 String deleteExecutions = "DELETE FROM " + wfExecStr + " wfe " + 717 "WHERE wfe.lsid IN (" +lsids+ ")"; 718 try 719 { 720 _psDeleteExecutions = _dbType.getPrepStatement(deleteExecutions); 721 numRowsDeleted = _psDeleteExecutions.executeUpdate(); 722 } 723 catch (SQLException e) { 724 throw new RecordingException("Error deleting executions:", e); 725 } 726 727 // determine which rows of data table will become cruft 728 // after executions deletion occurs 729 String md5sToDelete = getDataMD5sToDelete(lsids); 730 731 // see if there are any rows in the data table to delete 732 if(md5sToDelete.length() > 0) 733 { 734 // now cleanup data table 735 String deleteData = "DELETE FROM " + dataStr + " d " + 736 "WHERE d.md5 IN (" +md5sToDelete+ ")"; 737 try 738 { 739 _psDeleteData = _dbType.getPrepStatement(deleteData); 740 _psDeleteData.executeUpdate(); 741 } 742 catch (SQLException e) { 743 throw new RecordingException("Error deleting data:", e); 744 } 745 } 746 747 // tags table now automatically taken care of by cascade 748 // now that it uses WF_EXEC_ID fk 749 deleteWorkflowsWithNoExecutions(); 750 751 return numRowsDeleted; 752 } 753 754 /** Delete workflows from the database that have no executions. 755 * @return the number of workflows deleted. 756 */ 757 public int deleteWorkflowsWithNoExecutions() throws RecordingException 758 { 759 _checkConnection(false); 760 761 762 // see which workflows we're going to delete 763 List<Integer> wfIdsToBeDeleted = new LinkedList<Integer>(); 764 synchronized(_psWorkflowIdsForNoExecutions) 765 { 766 try 767 { 768 ResultSet result = null; 769 try 770 { 771 result = _psWorkflowIdsForNoExecutions.executeQuery(); 772 while(result.next()) 773 { 774 wfIdsToBeDeleted.add(result.getInt(1)); 775 } 776 } 777 finally 778 { 779 if(result != null) 780 { 781 result.close(); 782 } 783 } 784 } 785 catch(SQLException e) 786 { 787 throw new RecordingException("Error querying workflows to " + 788 "be deleted: ", e); 789 } 790 } 791 792 // see if any other recordings have this workflow open 793 // if so, tell them to reconnect to the workflow. 794 for(Integer wfId : wfIdsToBeDeleted) 795 { 796 synchronized(_v8Set) 797 { 798 // use an iterator so we can safely remove items. 799 Iterator<WeakReference<SQLRecordingV8>> iterator = _v8Set.iterator(); 800 while(iterator.hasNext()) 801 { 802 SQLRecordingV8 recording = iterator.next().get(); 803 804 // see if recording was garbage collected. 805 if(recording == null) 806 { 807 iterator.remove(); 808 } 809 else if(recording._wfId == wfId) 810 { 811 recording._needReconnectWF = true; 812 } 813 } 814 } 815 } 816 817 // delete the workflows. 818 // NOTE: this deletes all information about a workflow 819 // if it has no executions. 820 synchronized(_psDeleteWorkflowsForNoExecutions) 821 { 822 try 823 { 824 int numDeleted = 825 _psDeleteWorkflowsForNoExecutions.executeUpdate(); 826 //System.out.println("deleted " + numDeleted + " workflows."); 827 return numDeleted; 828 } 829 catch(SQLException e) 830 { 831 throw new RecordingException("Unable to delete workflows " + 832 "with no runs: ", e); 833 } 834 } 835 } 836 837 /** A tag was added, insert it */ 838 @Override 839 public void tagAdded(TagEvent event) throws RecordingException 840 { 841 // make sure we're connected to db 842 _checkConnection(false); 843 844 String urnStr = event.getTag().getConceptId(); 845 String tagStr = event.getTag().toString(); 846 847 // get the entity id of the tagged object. 848 NamedObj namedObj = event.getSource(); 849 WorkflowRun run = null; 850 851 if (namedObj instanceof WorkflowRun) 852 { 853 run = (WorkflowRun)namedObj; 854 boolean alreadyInserted = checkIfTagAlreadyInserted(run.getExecId(), urnStr); 855 856 if(!alreadyInserted) 857 { 858 // TODO if we remove the caveat for not updateRevision on workflowRuns in 859 // Tagging.tagAdded, do not rollRevision here: 860 run.rollRevision(run.getExecLSID(), this); 861 insertTag(WorkflowRun.TAG_TYPE_RUN, tagStr, urnStr, run.getExecId()); 862 } 863 864 } 865 866 } 867 868 /** 869 * Parse workflow moml for tags, and insert them, associated with execID 870 * @param runLsid 871 */ 872 public void insertWorkflowTags(int execId) 873 { 874 try 875 { 876 NamedObj workflow = _recorderContainer.toplevel(); 877 Iterator<?> iter = SMSServices.getActorSemanticTypes(workflow).iterator(); 878 while (iter.hasNext()) 879 { 880 SemanticType semtype = (SemanticType) iter.next(); 881 insertTag(WorkflowRun.TAG_TYPE_WORKFLOW, semtype.getConceptName(), 882 semtype.getConceptId(), execId); 883 } 884 } 885 catch (Exception e) 886 { 887 // TODO Auto-generated catch block 888 e.printStackTrace(); 889 } 890 } 891 892 /** Insert a new row into the tag table. */ 893 public void insertTag(String type, String tagStr, String urnStr, 894 int execId) throws RecordingException 895 { 896 synchronized(_psTagInsert) 897 { 898 try 899 { 900 _psTagInsert.setString(1, type); 901 _psTagInsert.setString(2, tagStr); 902 _psTagInsert.setString(3, urnStr); 903 _psTagInsert.setInt(4, execId); 904 _psTagInsert.execute(); 905 906 if(_debugWriter != null) 907 { 908 _debugWrite("INSERT INTO TAG(" + type + ", " + tagStr + 909 ", " + urnStr + ", "+ execId +")"); 910 } 911 912 } 913 catch(SQLException e) 914 { 915 throw new RecordingException("Could not insert tag:", e); 916 } 917 } 918 } 919 920 921 public boolean checkIfTagAlreadyInserted(int execId, String urn) 922 { 923 boolean found = false; 924 // see if tag is in table 925 synchronized(_psGetTagForExecIdAndURN) 926 { 927 try 928 { 929 ResultSet result = null; 930 try 931 { 932 _psGetTagForExecIdAndURN.setInt(1, execId); 933 _psGetTagForExecIdAndURN.setString(2, urn); 934 result = _psGetTagForExecIdAndURN.executeQuery(); 935 if(result.next()) 936 { 937 found = true; 938 } 939 } 940 finally 941 { 942 if(result != null) 943 { 944 result.close(); 945 } 946 } 947 } 948 catch(SQLException e) 949 { 950 e.printStackTrace(); 951 } 952 } 953 return found; 954 } 955 956 957 /** Record a custom provenance event. */ 958 @Override 959 public void customProvEvent(ProvenanceEvent event) throws RecordingException 960 { 961 if(_wfExecId == RegEntity.UNKNOWN_ID) 962 { 963 throw new RecordingException("Workflow must be running to save associated data."); 964 } 965 966 final Map<String,String> map = event.getMap(); 967 968 if(map.isEmpty()) 969 { 970 throw new RecordingException("Metadata map is empty."); 971 } 972 973 try 974 { 975 synchronized(_psAssocDataInsertNoDataId) 976 { 977 for(Map.Entry<String,String> entry : map.entrySet()) 978 { 979 _psAssocDataInsertNoDataId.setString(1, entry.getKey()); 980 _psAssocDataInsertNoDataId.setString(2, entry.getValue()); 981 _psAssocDataInsertNoDataId.setInt(3, _wfExecId); 982 _dbType.insert(_psAssocDataInsertNoDataId, "associated_data", "id"); 983 984 if(_debugWriter != null) 985 { 986 _debugWrite("INSERT INTO ASSOCIATED_DATA (" + 987 entry.getKey() + ", " + 988 entry.getValue() + ", " + 989 _wfExecId + ")"); 990 } 991 } 992 } 993 } 994 catch(SQLException e) 995 { 996 throw new RecordingException("Error trying to insert into " + 997 "associated_data table: " + e.getMessage()); 998 } 999 } 1000 1001 /** A tag was removed. */ 1002 @Override 1003 public void tagRemoved(TagEvent event) throws RecordingException 1004 { 1005 // make sure we're connected to db 1006 _checkConnection(false); 1007 1008 String urnStr = event.getTag().getConceptId(); 1009 NamedObj namedObj = event.getSource(); 1010 1011 // just dealing with WorkflowRuns here. We don't want to 1012 // allow the removal of a tag on a workflow that's already run 1013 //RegEntity entity = _entityCacheTable.get(namedObj); 1014 1015 if (namedObj instanceof WorkflowRun) 1016 { 1017 WorkflowRun run = (WorkflowRun)namedObj; 1018 synchronized(_psDeleteTagForExecIdAndURN) 1019 { 1020 try 1021 { 1022 boolean alreadyInserted = checkIfTagAlreadyInserted(run.getExecId(), urnStr); 1023 if (alreadyInserted){ 1024 _psDeleteTagForExecIdAndURN.setInt(1, run.getExecId()); 1025 _psDeleteTagForExecIdAndURN.setString(2, urnStr); 1026 _psDeleteTagForExecIdAndURN.execute(); 1027 run.rollRevision(run.getExecLSID(), this); 1028 } 1029 } 1030 catch(SQLException e) 1031 { 1032 throw new RecordingException("Error deleting tag: ", e); 1033 } 1034 } 1035 } 1036 } 1037 1038 /** If run's exec LSID does not already exist in provenance, insert WorkflowRun 1039 * into provenance. This is a special case for 'preview' runs, where no other 1040 * associated data (workflow data, reporting files) are inserted. 1041 * 1042 * @param karFile 1043 * @param run 1044 * @throws RecordingException 1045 */ 1046 public boolean insertPreviewRun(WorkflowRun run) throws RecordingException 1047 { 1048 1049 Queryable q = null; 1050 try 1051 { 1052 q = getQueryable(false); //this will make sure we're connected to db 1053 } 1054 catch (QueryException e1) 1055 { 1056 // TODO Auto-generated catch block 1057 e1.printStackTrace(); 1058 return false; 1059 } 1060 1061 try 1062 { 1063 // don't insert if some form of this execution already exists in 1064 // this provenance store. 1065 1066 // don't insert if run lsid is the same as one of the oldest referrals 1067 // of an execution in this prov store (without rev). 1068 KeplerLSID runLSID = run.getExecLSID(); 1069 Integer execId = q.getExecutionForOldestReferralExecutionLSIDWithoutRevision( 1070 runLSID.toStringWithoutRevision()); 1071 if (execId != null) 1072 { 1073 _warn("SQLRecordingV8 insertHuskRun - Not inserting run! run:"+execId+ 1074 " has the same originalExecLSID without rev:"+ 1075 runLSID.toStringWithoutRevision()); 1076 return false; 1077 } 1078 1079 // don't insert if one of the oldest referrals of an execution in this 1080 // prov store (without rev) is the same as the oldest referral of this run 1081 // (without rev) 1082 KeplerLSID originalExecLSID = null; 1083 List<KeplerLSID>refs = run.getReferralList(); 1084 if (refs != null && !refs.isEmpty()) 1085 { 1086 originalExecLSID = refs.get(refs.size()-1); 1087 execId = q.getExecutionForOldestReferralExecutionLSIDWithoutRevision( 1088 originalExecLSID.toStringWithoutRevision()); 1089 } 1090 if (execId != null) 1091 { 1092 _warn("SQLRecordingV8 insertPreviewRun - Not inserting run! run:"+execId+ 1093 " has the same originalExecLSID without rev:"+ 1094 originalExecLSID.toStringWithoutRevision()); 1095 return false; 1096 } 1097 1098 // don't insert if the execution originated in this prov store 1099 // (ie oldest referral (without rev) is in this prov store's execution list) 1100 if (originalExecLSID != null) 1101 { 1102 execId = q.getExecutionForExecutionLSIDWithoutRevision( 1103 originalExecLSID.toStringWithoutRevision()); 1104 if (execId != null) 1105 { 1106 _warn("SQLRecordingV8 insertPreviewRun - Not inserting run! " + 1107 "Run originated on this machine and still exists - run:"+ 1108 execId+"'s lsid is the same as this run's originalExecLSID " + 1109 "without rev:"+originalExecLSID.toStringWithoutRevision()); 1110 return false; 1111 } 1112 } 1113 1114 // don't insert if the execution (without rev) is in this 1115 // prov store's execution list 1116 execId = q.getExecutionForExecutionLSIDWithoutRevision( 1117 run.getExecLSID().toStringWithoutRevision()); 1118 if (execId != null) 1119 { 1120 _warn("SQLRecordingV8 insertPreviewRun - Not inserting run! " + 1121 "run:"+ execId+"'s lsid is same as this run's lsid without rev:"+ 1122 run.getExecLSID().toStringWithoutRevision()); 1123 return false; 1124 } 1125 1126 1127 Integer workflowExecId = null; 1128 1129 // only insert workflow if not already in provenance 1130 int workflowId = _getWorkflowId(run.getWorkflowLSID().toStringWithoutRevision()); 1131 if (workflowId == RegEntity.UNKNOWN_ID) 1132 { 1133 synchronized(_psWorkflowInsert) 1134 { 1135 _psWorkflowInsert.setString(1, run.getWorkflowName()); 1136 _psWorkflowInsert.setString(2, run.getWorkflowLSID().toStringWithoutRevision()); 1137 workflowId = _dbType.insert(_psWorkflowInsert, "workflow", "id"); 1138 } 1139 } 1140 1141 // now insert workflow exec start info 1142 synchronized(_psWorkflowExecStart) 1143 { 1144 // NOTE we do not have the workflow for preview run, 1145 // so we use fake workflow data for the Data table. 1146 byte[] fakeWfBytes = "".getBytes(); 1147 String wfMD5Str = _checkData(fakeWfBytes); 1148 1149 _psWorkflowExecStart.setInt(1, workflowId); 1150 _psWorkflowExecStart.setString(2, run.getUser()); 1151 _psWorkflowExecStart.setTimestamp(3, 1152 new Timestamp(run.getStartTime().getTime())); 1153 _psWorkflowExecStart.setString(4, wfMD5Str); 1154 _psWorkflowExecStart.setString(5, run.getHostId()); 1155 _psWorkflowExecStart.setString(6, run.getAnnotation()); 1156 _psWorkflowExecStart.setString(7, run.getExecLSID().toString()); 1157 _psWorkflowExecStart.setString(8, run.getWorkflowLSID().toString()); 1158 _psWorkflowExecStart.setString(9, run.getModuleDependencies()); 1159 if (run.getErrorMessages().isEmpty()){ 1160 _psWorkflowExecStart.setString(10, WorkflowRun.type.Preview.toString()); 1161 } 1162 else{ 1163 _psWorkflowExecStart.setString(10, WorkflowRun.type.Preview_Error.toString()); 1164 } 1165 workflowExecId = _dbType.insert(_psWorkflowExecStart, 1166 "workflow_exec", "id"); 1167 } 1168 1169 // if workflow exec start info insert worked, now insert exec stop info 1170 // and reporting items and error messages if they exist 1171 if (workflowExecId != RegEntity.UNKNOWN_ID) 1172 { 1173 synchronized(_psWorkflowExecStop) 1174 { 1175 Timestamp endTime = new Timestamp(run.getStartTime().getTime() + 1176 (run.getDuration()*numMillisecondsInASecond)); 1177 _psWorkflowExecStop.setTimestamp(1, endTime); 1178 _psWorkflowExecStop.setInt(2, workflowExecId); 1179 _psWorkflowExecStop.executeUpdate(); 1180 } 1181 1182 KeplerLSID runLSIDBeforeExecIdChange = run.getExecLSID(); 1183 1184 // replace original execId. this must be before call to insertAnyHuskRunErrorMessages 1185 run.resetExecId(workflowExecId, this); 1186 1187 //insert any referrals 1188 insertAnyHuskRunReferralList(run); 1189 1190 insertAnyHuskRunErrorMessages(run); 1191 1192 insertAnyHuskRunTags(run); 1193 } 1194 else 1195 { 1196 return false; 1197 } 1198 1199 return true; 1200 } 1201 catch (SQLException e) 1202 { 1203 // TODO Auto-generated catch block 1204 e.printStackTrace(); 1205 return false; 1206 } 1207 catch (Exception e) 1208 { 1209 // TODO Auto-generated catch block 1210 e.printStackTrace(); 1211 return false; 1212 } 1213 1214 } 1215 1216 /** If run's exec LSID does not already exist in provenance, insert WorkflowRun 1217 * into provenance. This is a special case where the run may come from a KAR, 1218 * potentially from a different system. 1219 * 1220 * @param karFile 1221 * @param run 1222 * @throws RecordingException 1223 */ 1224 public boolean insertHuskRun(KARFile karFile, WorkflowRun run) throws RecordingException 1225 { 1226 1227 Queryable q = null; 1228 try 1229 { 1230 q = getQueryable(false); //this will make sure we're connected to db 1231 } 1232 catch (QueryException e1) 1233 { 1234 // TODO Auto-generated catch block 1235 e1.printStackTrace(); 1236 return false; 1237 } 1238 1239 try 1240 { 1241 // don't insert if some form of this execution already exists in 1242 // this provenance store. 1243 1244 // don't insert if run lsid is the same as one of the oldest referrals 1245 // of an execution in this prov store (without rev). 1246 KeplerLSID runLSID = run.getExecLSID(); 1247 Integer execId = q.getExecutionForOldestReferralExecutionLSIDWithoutRevision( 1248 runLSID.toStringWithoutRevision()); 1249 if (execId != null) 1250 { 1251 _warn("SQLRecordingV8 insertHuskRun - Not inserting run! run:"+execId+ 1252 " has the same originalExecLSID without rev:"+ 1253 runLSID.toStringWithoutRevision()); 1254 return false; 1255 } 1256 1257 // don't insert if one of the oldest referrals of an execution in this 1258 // prov store (without rev) is the same as the oldest referral of this run 1259 // (without rev) 1260 KeplerLSID originalExecLSID = null; 1261 List<KeplerLSID>refs = run.getReferralList(); 1262 if (refs != null && !refs.isEmpty()) 1263 { 1264 originalExecLSID = refs.get(refs.size()-1); 1265 execId = q.getExecutionForOldestReferralExecutionLSIDWithoutRevision( 1266 originalExecLSID.toStringWithoutRevision()); 1267 } 1268 if (execId != null) 1269 { 1270 _warn("SQLRecordingV8 insertHuskRun - Not inserting run! run:"+execId+ 1271 " has the same originalExecLSID without rev:"+ 1272 originalExecLSID.toStringWithoutRevision()); 1273 return false; 1274 } 1275 1276 // don't insert if the execution originated in this prov store 1277 // (ie oldest referral (without rev) is in this prov store's execution list) 1278 if (originalExecLSID != null) 1279 { 1280 execId = q.getExecutionForExecutionLSIDWithoutRevision( 1281 originalExecLSID.toStringWithoutRevision()); 1282 if (execId != null) 1283 { 1284 _warn("SQLRecordingV8 insertHuskRun - Not inserting run! " + 1285 "Run originated on this machine and still exists - run:"+ 1286 execId+"'s lsid is the same as this run's originalExecLSID " + 1287 "without rev:"+originalExecLSID.toStringWithoutRevision()); 1288 return false; 1289 } 1290 } 1291 1292 // don't insesrt if the execution (without rev) is in this 1293 // prov store's execution list 1294 execId = q.getExecutionForExecutionLSIDWithoutRevision( 1295 run.getExecLSID().toStringWithoutRevision()); 1296 if (execId != null) 1297 { 1298 _warn("SQLRecordingV8 insertHuskRun - Not inserting run! " + 1299 "run:"+ execId+"'s lsid is same as this run's lsid without rev:"+ 1300 run.getExecLSID().toStringWithoutRevision()); 1301 return false; 1302 } 1303 1304 1305 Integer workflowExecId = null; 1306 1307 // only insert workflow if not already in provenance 1308 int workflowId = _getWorkflowId(run.getWorkflowLSID().toStringWithoutRevision()); 1309 if (workflowId == RegEntity.UNKNOWN_ID) 1310 { 1311 synchronized(_psWorkflowInsert) 1312 { 1313 _psWorkflowInsert.setString(1, run.getWorkflowName()); 1314 _psWorkflowInsert.setString(2, run.getWorkflowLSID().toStringWithoutRevision()); 1315 workflowId = _dbType.insert(_psWorkflowInsert, "workflow", "id"); 1316 } 1317 } 1318 1319 // now insert workflow exec start info 1320 synchronized(_psWorkflowExecStart) 1321 { 1322 //byte[] wfBytes = om.getHighestObjectRevision(run.getWorkflowLSID()).exportMoML().getBytes(); 1323 List<KAREntry> karEntries = karFile.karEntries(); 1324 KAREntry workflowEntry = null; 1325 for (KAREntry karEntry: karEntries){ 1326 if (karEntry.getLSID().equals(run.getWorkflowLSID())){ 1327 workflowEntry = karEntry; 1328 break; 1329 } 1330 } 1331 // FIXME: workflowEntry could be null 1332 // FIXME: close workflowStream? 1333 InputStream workflowStream = karFile.getInputStream(workflowEntry); 1334 String workflowString = FileUtil.convertStreamToString(workflowStream); 1335 //TODO would be better to avoid parsing the workflow (but getting it from OM 1336 // is not desirable either, it's not always there) -- see r22173. 1337 // Parsing can cause bug related to text provenance type -- a second prov 1338 // recorder is created and tries to write to the same text file. 1339 // But discussed w/ Dan, this is rare and text type is only used for debugging. 1340 MoMLParser parser = new MoMLParser(); 1341 NamedObj workflow = parser.parse(workflowString); 1342 byte[] wfBytes = workflow.exportMoML().getBytes(); 1343 1344 String wfMD5Str = _checkData(wfBytes); 1345 1346 _psWorkflowExecStart.setInt(1, workflowId); 1347 _psWorkflowExecStart.setString(2, run.getUser()); 1348 _psWorkflowExecStart.setTimestamp(3, 1349 new Timestamp(run.getStartTime().getTime())); 1350 _psWorkflowExecStart.setString(4, wfMD5Str); 1351 _psWorkflowExecStart.setString(5, run.getHostId()); 1352 _psWorkflowExecStart.setString(6, run.getAnnotation()); 1353 _psWorkflowExecStart.setString(7, run.getExecLSID().toString()); 1354 _psWorkflowExecStart.setString(8, run.getWorkflowLSID().toString()); 1355 _psWorkflowExecStart.setString(9, run.getModuleDependencies()); 1356 if (run.getErrorMessages().isEmpty()){ 1357 _psWorkflowExecStart.setString(10, WorkflowRun.type.Imported.toString()); 1358 } 1359 else{ 1360 _psWorkflowExecStart.setString(10, WorkflowRun.type.Imported_Error.toString()); 1361 } 1362 workflowExecId = _dbType.insert(_psWorkflowExecStart, 1363 "workflow_exec", "id"); 1364 } 1365 1366 // if workflow exec start info insert worked, now insert exec stop info 1367 // and reporting items and error messages if they exist 1368 if (workflowExecId != RegEntity.UNKNOWN_ID) 1369 { 1370 synchronized(_psWorkflowExecStop) 1371 { 1372 Timestamp endTime = new Timestamp(run.getStartTime().getTime() + 1373 (run.getDuration()*numMillisecondsInASecond)); 1374 _psWorkflowExecStop.setTimestamp(1, endTime); 1375 _psWorkflowExecStop.setInt(2, workflowExecId); 1376 _psWorkflowExecStop.executeUpdate(); 1377 } 1378 1379 KeplerLSID runLSIDBeforeExecIdChange = run.getExecLSID(); 1380 1381 // replace original execId. this must be before call to insertAnyHuskRunErrorMessages 1382 run.resetExecId(workflowExecId, this); 1383 1384 //insert any referrals 1385 insertAnyHuskRunReferralList(run); 1386 1387 //insert any reporting files that might be in KAR 1388 insertAnyHuskRunReportFiles(run, runLSIDBeforeExecIdChange, karFile); 1389 1390 insertAnyHuskRunErrorMessages(run); 1391 1392 insertAnyHuskRunTags(run); 1393 } 1394 else 1395 { 1396 return false; 1397 } 1398 1399 return true; 1400 } 1401 catch (SQLException e) 1402 { 1403 // TODO Auto-generated catch block 1404 e.printStackTrace(); 1405 return false; 1406 } 1407 catch (Exception e) 1408 { 1409 // TODO Auto-generated catch block 1410 e.printStackTrace(); 1411 return false; 1412 } 1413 1414 } 1415 1416 1417 public void insertAnyHuskRunReferralList(WorkflowRun run) 1418 { 1419 StringAttribute referralListAttr = (StringAttribute) run.getAttribute(NamedObjIdReferralList.NAME); 1420 if (referralListAttr != null) 1421 { 1422 try 1423 { 1424 _psChangeExecutionReferralList.setString(1, referralListAttr.getExpression()); 1425 _psChangeExecutionReferralList.setInt(2, run.getExecId()); 1426 _psChangeExecutionReferralList.execute(); 1427 } 1428 catch (Exception e) 1429 { 1430 // TODO Auto-generated catch block 1431 e.printStackTrace(); 1432 } 1433 } 1434 } 1435 1436 private void insertAnyHuskRunReportFiles(WorkflowRun run, KeplerLSID runLSIDBeforeExecIdChange, KARFile karFile) 1437 { 1438 1439 try 1440 { 1441 // get all entries that are ReportLayoutKAREntryHandler and ReportInstanceKAREntryHandler 1442 List<KAREntry> entries = karFile.karEntries(); 1443 Iterator<KAREntry> itr = entries.iterator(); 1444 while (itr.hasNext()) 1445 { 1446 KAREntry karEntry = itr.next(); 1447 String handler = karEntry.getHandler(); 1448 String name = karEntry.getName(); 1449 String type = karEntry.getType(); 1450 1451 //FIXME hardcoded strings 1452 if (handler.endsWith("ReportLayoutKAREntryHandler") || 1453 handler.endsWith("ReportInstanceKAREntryHandler")) 1454 { 1455 if (karEntry.dependsOn(runLSIDBeforeExecIdChange)){ 1456 insertHuskRunReportFile(name, type, karFile, run.getExecId()); 1457 } 1458 } 1459 1460 } 1461 } 1462 catch (Exception e) 1463 { 1464 // TODO Auto-generated catch block 1465 e.printStackTrace(); 1466 } 1467 } 1468 1469 private void insertAnyHuskRunErrorMessages(WorkflowRun run) throws RecordingException 1470 { 1471 try 1472 { 1473 Map<Integer, String> errorMsgs = run.getErrorMessages(); 1474 1475 Iterator<Integer> itr = errorMsgs.keySet().iterator(); 1476 while (itr.hasNext()){ 1477 String errorMsg = errorMsgs.get(itr.next()); 1478 synchronized(_psErrorInsert) 1479 { 1480 //TODO I believe it makes sense to not include the original entity id 1481 // since we may be on a different system. But is UNKNOWN_ID what we want? 1482 _psErrorInsert.setInt(1, RegEntity.UNKNOWN_ID); 1483 _psErrorInsert.setInt(2, run.getExecId()); 1484 _psErrorInsert.setString(3, errorMsg); 1485 _dbType.insert(_psErrorInsert, "error", "id"); 1486 1487 if(_debugWriter != null) 1488 { 1489 _debugWrite("INSERT INTO ERROR(" + RegEntity.UNKNOWN_ID + ", " + 1490 run.getExecId() + ", " + errorMsg + ")"); 1491 } 1492 } 1493 } 1494 } 1495 catch(SQLException e) 1496 { 1497 throw new RecordingException("Error inserting into error table: ",e); 1498 } 1499 catch (RecordingException e) 1500 { 1501 // TODO Auto-generated catch block 1502 e.printStackTrace(); 1503 } 1504 } 1505 1506 1507 private void insertAnyHuskRunTags(WorkflowRun run) throws RecordingException 1508 { 1509 1510 Map<NamedOntClass, String> tags = run.getTags(); 1511 for(Map.Entry<NamedOntClass, String> entry : tags.entrySet()) 1512 { 1513 NamedOntClass tag = entry.getKey(); 1514 String type = entry.getValue(); 1515 tag.getConceptId(); 1516 type = tags.get(tag); 1517 insertTag(type, tag.toString(), tag.getConceptId(), run.getExecId()); 1518 } 1519 1520 } 1521 1522 1523 private void insertHuskRunReportFile(String reportItem, String type, KARFile karFile, int runExecId) 1524 { 1525 1526 //any reason file should be gotten out of ObjectManager instead of straight from kar? 1527 //File reportLayoutFile = karFile.getKAREntry(reportLayoutLSID); 1528 //NamedObj reportLayout = ObjectManager.getInstance().getHighestObjectRevision(reportLayoutLSID); 1529 1530 JarEntry je = karFile.getJarEntry(reportItem); 1531 Map<String, String>metadataMap = new HashMap<String, String>(); 1532 byte[] buf = new byte[1024]; 1533 int len = 0; 1534 1535 try 1536 { 1537 File file = File.createTempFile("keplerTemp", je.getName()); 1538 try(InputStream in = new BufferedInputStream(karFile.getInputStream(je)); 1539 OutputStream out = new FileOutputStream(file.getAbsolutePath());) 1540 { 1541 while ((len = in.read(buf)) > 0) 1542 { 1543 out.write(buf, 0, len); 1544 } 1545 } 1546 1547 metadataMap.put("type", type); 1548 addFileForExecution(metadataMap, file, runExecId); 1549 boolean deleted = file.delete(); 1550 if (!deleted){ 1551 System.out.println("SQLRecordingV8 WARN - unable to delete file:"+file.toString()); 1552 } 1553 } 1554 catch (IOException e) 1555 { 1556 // TODO Auto-generated catch block 1557 e.printStackTrace(); 1558 } 1559 catch (RecordingException e) 1560 { 1561 // TODO Auto-generated catch block 1562 e.printStackTrace(); 1563 } 1564 } 1565 1566 1567 //helper method determines which rows in data table can be deleted when deleting workflow executions 1568 private String getDataMD5sToDelete(StringBuilder lsids) throws RecordingException 1569 { 1570 1571 StringBuilder md5sToDelete = new StringBuilder(); 1572 1573 String wfExecStr = _dbType.getTableName("workflow_exec"); 1574 String associatedDataStr = _dbType.getTableName("associated_data"); 1575 String portEventStr = _dbType.getTableName("port_event"); 1576 String actorFireStr = _dbType.getTableName("actor_fire"); 1577 1578 // FIXME: declare all PreparedStatements locally, 1579 // close all PreperatedStatements and ResultSets. 1580 1581 try 1582 { 1583 // gather md5s (wf_contents_id) for workflow_exec 1584 String wfContentsIdsToDelete = "SELECT DISTINCT wfe.wf_contents_id FROM " + wfExecStr + 1585 " wfe WHERE wfe.lsid IN (" +lsids+ ")"; 1586 _psGetWfContentsIdsToDeleteQuery = _dbType.getPrepStatement(wfContentsIdsToDelete); 1587 ResultSet resultwfContents = _psGetWfContentsIdsToDeleteQuery.executeQuery(); 1588 1589 // gather md5s (wf_contents_id) for workflow_exec that are still referred to and can't be deleted 1590 String wfContentsIdsUsedByOthers = "SELECT DISTINCT wfe.wf_contents_id FROM " + wfExecStr + 1591 " wfe WHERE wfe.lsid NOT IN (" +lsids+ ")"; 1592 _psGetWfContentsIdsToNotDeleteQuery = _dbType.getPrepStatement(wfContentsIdsUsedByOthers); 1593 ResultSet resultwfContents2 = _psGetWfContentsIdsToNotDeleteQuery.executeQuery(); 1594 1595 // gather md5s (data_id) from associated_data 1596 String associatedDataDataIdsToDelete = "SELECT ad.data_id FROM "+associatedDataStr+" ad WHERE ad.wf_exec_id IN " + 1597 "(SELECT wfe.id FROM "+wfExecStr+ " wfe WHERE wfe.lsid IN (" +lsids+ "))"; 1598 _psGetAssociatedDataDataIdsToDeleteQuery = _dbType.getPrepStatement(associatedDataDataIdsToDelete); 1599 ResultSet resultAssociatedDataDataIds = _psGetAssociatedDataDataIdsToDeleteQuery.executeQuery(); 1600 1601 // gather md5s (data_id) from associated_data that are still referred to and can't be deleted 1602 String associatedDataDataIdsToNotDelete = "SELECT ad.data_id FROM "+associatedDataStr+" ad WHERE ad.wf_exec_id IN " + 1603 "(SELECT wfe.id FROM "+wfExecStr+ " wfe WHERE wfe.lsid NOT IN (" +lsids+ "))"; 1604 _psGetAssociatedDataDataIdsToNotDeleteQuery = _dbType.getPrepStatement(associatedDataDataIdsToNotDelete); 1605 ResultSet resultAssociatedDataDataIds2 = _psGetAssociatedDataDataIdsToNotDeleteQuery.executeQuery(); 1606 1607 // gather md5s (data_id) from port_event 1608 String portEventDataIdsToDelete = "SELECT DISTINCT pe.data_id FROM "+portEventStr+" pe WHERE pe.fire_id IN " + 1609 "(SELECT af.id FROM "+actorFireStr+" af WHERE af.wf_exec_id IN " + 1610 "(SELECT wfe.id FROM "+wfExecStr+" wfe WHERE wfe.lsid IN ("+lsids+")))"; 1611 // written this way, the query is slower. 1612 //String portEventDataIdsToDelete2 = "SELECT DISTINCT pe.data_id FROM "+ portEventStr+" pe,"+ actorFireStr+" af,"+ 1613 // wfExecStr+" wfe WHERE pe.fire_id = af.id AND af.wf_exec_id = wfe.id AND wfe.lsid IN ("+lsids+")"; 1614 1615 _psGetPortEventDataIdsToDeleteQuery = _dbType.getPrepStatement(portEventDataIdsToDelete); 1616 ResultSet resultPortEventDataIds = _psGetPortEventDataIdsToDeleteQuery.executeQuery(); 1617 1618 // gather md5s (data_id) from port_event that are still referred to and can't be deleted 1619 String portEventDataIdsToNotDelete = "SELECT DISTINCT pe.data_id FROM "+portEventStr+" pe WHERE pe.fire_id IN " + 1620 "(SELECT af.id FROM "+actorFireStr+" af WHERE af.wf_exec_id IN " + 1621 "(SELECT wfe.id FROM "+wfExecStr+" wfe WHERE wfe.lsid NOT IN ("+lsids+")))"; 1622 // written this way, the query is slower. 1623 //String portEventDataIdsToNotDelete2 = "SELECT DISTINCT pe.data_id FROM "+ portEventStr+" pe,"+ actorFireStr+" af,"+ 1624 // wfExecStr+" wfe WHERE pe.fire_id = af.id AND af.wf_exec_id = wfe.id AND wfe.lsid NOT IN ("+lsids+")"; 1625 1626 _psGetPortEventDataIdsToNotDeleteQuery = _dbType.getPrepStatement(portEventDataIdsToNotDelete); 1627 ResultSet resultPortEventDataIds2 = _psGetPortEventDataIdsToNotDeleteQuery.executeQuery(); 1628 1629 List<String> md5s = new ArrayList<String>(); 1630 while(resultwfContents.next()) 1631 { 1632 md5s.add(resultwfContents.getString("wf_contents_id")); 1633 } 1634 resultwfContents.close(); 1635 while (resultwfContents2.next()) 1636 { 1637 md5s.remove(resultwfContents2.getString("wf_contents_id")); 1638 } 1639 resultwfContents2.close(); 1640 while(resultAssociatedDataDataIds.next()) 1641 { 1642 md5s.add(resultAssociatedDataDataIds.getString("data_id")); 1643 } 1644 resultAssociatedDataDataIds.close(); 1645 while (resultAssociatedDataDataIds2.next()) 1646 { 1647 md5s.remove(resultAssociatedDataDataIds2.getString("data_id")); 1648 } 1649 resultAssociatedDataDataIds2.close(); 1650 while(resultPortEventDataIds.next()) 1651 { 1652 md5s.add(resultPortEventDataIds.getString("data_id")); 1653 } 1654 resultPortEventDataIds.close(); 1655 while (resultPortEventDataIds2.next()) 1656 { 1657 md5s.remove(resultPortEventDataIds2.getString("data_id")); 1658 } 1659 resultPortEventDataIds2.close(); 1660 1661 Iterator<String> itr = md5s.iterator(); 1662 while (itr.hasNext()) 1663 { 1664 md5sToDelete.append("'"+itr.next()+"',"); 1665 } 1666 1667 // remove trailing comma 1668 if (md5sToDelete.length() > 0) 1669 { 1670 md5sToDelete.delete(md5sToDelete.length()-1, md5sToDelete.length()); 1671 } 1672 1673 } 1674 catch (SQLException e) 1675 { 1676 throw new RecordingException("Error while determining which rows in data table to delete:", e); 1677 } 1678 1679 return md5sToDelete.toString(); 1680 } 1681 1682 //////////////////////////////////////////////////////////////////////// 1683 //// protected methods //// 1684 1685 /** Add a new row to the entity table. */ 1686 @Override 1687 protected RegEntity _addEntity(int containerId, RegEntity.EntityType type, 1688 String fullName, String displayName, int prevId) 1689 throws RecordingException, SQLException 1690 { 1691 // see if we've recording the workflow change 1692 if(_evolId == RegEntity.UNKNOWN_ID) 1693 { 1694 _addWorkflowChange(); 1695 } 1696 1697 // see if display name is different 1698 String rowDisplayName; 1699 1700 String name = fullName.substring(fullName.lastIndexOf(".") + 1); 1701 1702 if(name.equals(displayName)) 1703 { 1704 rowDisplayName = ""; 1705 } 1706 else 1707 { 1708 rowDisplayName = displayName; 1709 } 1710 1711 String typeStr = type.toString(); 1712 1713 //(wf_change_id, container_id, type, name, prev_id, wf_id) " + 1714 //System.out.println("evolId = " + _evolId); 1715 1716 _psEntityInsert.setInt(1, _evolId); 1717 _psEntityInsert.setString(2, typeStr); 1718 _psEntityInsert.setString(3, fullName); 1719 _psEntityInsert.setString(4, rowDisplayName); 1720 _psEntityInsert.setInt(5, prevId); 1721 _psEntityInsert.setInt(6, _wfId); 1722 int newId = _dbType.insert(_psEntityInsert, "entity", "id"); 1723 RegEntity retval = new RegEntity(newId, true, containerId, type); 1724 1725 if(_debugWriter != null) 1726 { 1727 _debugWrite("INSERT INTO ENTITY (" + _evolId + ", " + type + 1728 ", " + fullName + ", " + rowDisplayName + ", " + prevId + 1729 ", " + _wfId + ")"); 1730 } 1731 1732 return retval; 1733 } 1734 1735 /** Add a new row to the workflow table. */ 1736 @Override 1737 protected void _addWorkflow() throws RecordingException 1738 { 1739 if(_wfLSID == null) 1740 { 1741 throw new RecordingException("No workflow LSID"); 1742 } 1743 1744 //System.out.println("INSERTING new workflow " + _wfLSID.toString()); 1745 1746 try 1747 { 1748 synchronized(_psWorkflowInsert) 1749 { 1750 _psWorkflowInsert.setString(1, _wfNameStr); 1751 _psWorkflowInsert.setString(2, _wfLSID.toStringWithoutRevision()); 1752 _wfId = _dbType.insert(_psWorkflowInsert, "workflow", "id"); 1753 _wfReset(); 1754 } 1755 1756 if(_debugWriter != null) 1757 { 1758 _debugWrite("INSERT INTO WORKFLOW(" + _wfNameStr + ", lsid)"); 1759 } 1760 } 1761 catch(SQLException e) 1762 { 1763 throw new RecordingException("Error adding row to workflow:", e); 1764 } 1765 } 1766 1767 /** Create a new row in the workflow_change table. */ 1768 @Override 1769 protected void _addWorkflowChange() throws RecordingException 1770 { 1771 if(_wfUserStr == null) 1772 { 1773 throw new RecordingException("Need workflow user name."); 1774 } 1775 1776 if(_machineStr == null) 1777 { 1778 throw new RecordingException("Need machine name."); 1779 } 1780 1781 try 1782 { 1783 synchronized(_psWorkflowChangeInsert) 1784 { 1785 _psWorkflowChangeInsert.setString(1, _wfUserStr); 1786 _psWorkflowChangeInsert.setTimestamp(2, 1787 _workflowChangeTimeStack.peek()); 1788 _psWorkflowChangeInsert.setInt(3, _wfId); 1789 _psWorkflowChangeInsert.setString(4, _machineStr); 1790 _evolId = _dbType.insert(_psWorkflowChangeInsert, 1791 "workflow_change", "id"); 1792 //System.out.println("got new evolId = " + _evolId); 1793 1794 if(_debugWriter != null) 1795 { 1796 _debugWrite("INSERT INTO WORKFLOW_CHANGE(" + 1797 _wfUserStr + ", wfChangeTime, " + _wfId + ", " + 1798 _machineStr + ")"); 1799 } 1800 } 1801 } 1802 catch(SQLException e) 1803 { 1804 throw new RecordingException("Error adding to workflow_change: ", 1805 e); 1806 } 1807 } 1808 1809 /** Modify a NamedObj's full name. We remove the containing composite's 1810 * name, including the period. 1811 */ 1812 @Override 1813 protected String _changeEntityFullName(String name) 1814 { 1815 //_debug("name = " + name + " | containerFullName = " + _containerFullName); 1816 //_debug(" real container name = " + _recorder.getContainer().getFullName()); 1817 1818 // see if the name is our container's 1819 if(name.equals(_containerFullName)) 1820 { 1821 // see if we have a relative name 1822 if(_containerName == null || _containerName.length() == 0) 1823 { 1824 return ""; 1825 } 1826 else 1827 { 1828 return _containerFullName.substring(_containerFullName.indexOf(".", 1)); 1829 } 1830 } 1831 // see if it is the Graph Editor's default blank composite. 1832 else if(name.equals(".") && _containerFullName.equals( 1833 ".configuration.effigyFactory.Graph Editor.blank")) 1834 { 1835 return ""; 1836 } 1837 else 1838 { 1839 String relativeName; 1840 1841 // NOTE: _containerFullName may not be a substring of name. 1842 // this can happen, e.g., when using the ModelReference actor 1843 // see http://bugzilla.ecoinformatics.org/show_bug.cgi?id=4359 1844 // 1845 // instead of using _containerFullName, remove everything up to 1846 // the second period. e.g., .foo.bar becomes .bar 1847 // 1848 1849 int index = name.indexOf(".", 1); 1850 1851 // if there is no second period, i.e., name is the top level 1852 // composite, return an empty string. 1853 if(index == -1) 1854 { 1855 relativeName = ""; 1856 } 1857 else 1858 { 1859 relativeName = name.substring(index); 1860 } 1861 1862 //_debug("name " + name + " --> " + relativeName + 1863 //" (container name = " + _containerFullName + ")"); 1864 return relativeName; 1865 } 1866 } 1867 1868 /** Check parameters before (re)connecting to database or workflow. */ 1869 /* 1870 protected void _checkParameters() throws IllegalActionException, 1871 RecordingException 1872 { 1873 if((_wfNameStr == null || _wfNameStr.length() == 0) && 1874 (_containerFullName.equals(".") || _containerFullName.equals( 1875 ".configuration.effigyFactory.Graph Editor.blank"))) 1876 { 1877 _wfNameStr = TMP_START_NAME_STR + _random.nextInt(); 1878 1879 _debug("checkParameters() setting workflow name to: " 1880 + _wfNameStr); 1881 _debug(" container is " + _containerFullName + " hash = " 1882 + _recorder.getContainer().hashCode()); 1883 } 1884 1885 super._checkParameters(); 1886 } 1887 */ 1888 1889 /** Check validity of workflow name parameter. */ 1890 @Override 1891 protected void _checkWorkflowName() throws RecordingException 1892 { 1893 // do nothing since workflow name not required. 1894 } 1895 1896 /** Initialize the prepared statements. */ 1897 @Override 1898 protected void _createPreparedStatements() throws SQLException 1899 { 1900 if(_psWorkflowInsert == null && _schema.containsTable("workflow")) 1901 { 1902 _psWorkflowInsert = _dbType.getSQLInsert("workflow", "id", 1903 "name, lsid", "?, ?"); 1904 } 1905 1906 if(_psWorkflowQuery == null && _schema.containsTable("workflow")) 1907 { 1908 _psWorkflowQuery = _dbType.getSQLSelect("workflow", "id", 1909 "lsid = ?"); 1910 } 1911 1912 if(_psActorFireStart == null && _schema.containsTable("actor_fire")) 1913 { 1914 String defaultTimeStr = _dbType.getDefaultTimeStr(); 1915 _psActorFireStart = _dbType.getSQLInsert("actor_fire", "id", 1916 "actor_id, wf_exec_id, start_time, end_time", "?, ?, ?, " + 1917 defaultTimeStr); 1918 } 1919 1920 if(_psEntityInsert == null && _schema.containsTable("entity")) 1921 { 1922 _psEntityInsert = _dbType.getSQLInsert("entity", "id", 1923 "wf_change_id, type, name, display, prev_id, " + 1924 "wf_id", "?, ?, ?, ?, ?, ?"); 1925 } 1926 1927 if(_psWorkflowChangeInsert == null && 1928 _schema.containsTable("workflow_change")) 1929 { 1930 _psWorkflowChangeInsert = _dbType.getSQLInsert("workflow_change", 1931 "id", "user, time, wf_id, host_id", "?, ?, ?, ?"); 1932 } 1933 1934 if(_psWorkflowExecStart == null && 1935 _schema.containsTable("workflow_exec")) 1936 { 1937 String defaultTimeStr = _dbType.getDefaultTimeStr(); 1938 _psWorkflowExecStart = _dbType.getSQLInsert("workflow_exec", "id", 1939 "wf_id, user, start_time, wf_contents_id, end_time, host_id, " + 1940 "annotation, lsid, wf_full_lsid, module_dependencies, type", 1941 "?, ?, ?, ?, " + defaultTimeStr + ", ?, ?, ?, ?, ?, ?"); 1942 } 1943 1944 // port.type was removed. 1945 if(_psPortInsert == null && _schema.containsTable("port")) 1946 { 1947 _psPortInsert = _dbType.getSQLInsert("port", 1948 "id, direction, multiport", "?, ?, ?"); 1949 } 1950 1951 if(_psPortEventInsertWithChecksum == null && _schema.containsTable("port_event")) 1952 { 1953 _psPortEventInsertWithChecksum = _dbType.getSQLInsert("port_event", "id", 1954 //"channel, data_id, fire_id, file_id, port_id, rw_fire_id, " + 1955 "channel, data_id, fire_id, file_id, port_id, " + 1956 "time, type, write_event_id", 1957 "?, ?, ?, ?, ?, ?, ?, ?"); 1958 } 1959 1960 if(_psPortEventInsert == null && _schema.containsTable("port_event")) 1961 { 1962 _psPortEventInsert = _dbType.getSQLInsert("port_event", "id", 1963 "channel, data, fire_id, file_id, port_id, " + 1964 "time, type, write_event_id", 1965 "?, ?, ?, ?, ?, ?, ?, ?"); 1966 } 1967 1968 if(_psDataInsert == null && _schema.containsTable("data")) 1969 { 1970 _psDataInsert = _dbType.getSQLInsert("data", 1971 "contents, md5, truncated", "?, ?, ?"); 1972 } 1973 1974 if(_psDataMD5Query == null && _schema.containsTable("data")) 1975 { 1976 _psDataMD5Query = _dbType.getSQLSelect("data", "md5", 1977 "md5 = ?"); 1978 } 1979 /* 1980 //Note. not setting these up currently, see Note1 in SQLQueryV8. 1981 if(_psGetWfContentsIdsToDeleteQuery == null && _schema.containsTable("workflow_exec")) 1982 { 1983 String wfExecStr = _dbType.getTableName("workflow_exec"); 1984 String wfContentsIdsOfToDelete = "SELECT DISTINCT wfe.wf_contents_id FROM " + wfExecStr + 1985 " wfe WHERE wfe.lsid IN ("; 1986 int someLargeNumber = 10000; 1987 for (int i=0; i< someLargeNumber; i++){ 1988 wfContentsIdsOfToDelete.concat("?,?,?,?,?,?,?,?,?,?"); 1989 } 1990 wfContentsIdsOfToDelete.concat(")"); 1991 _psGetWfContentsIdsToDeleteQuery = _dbType.getPrepStatement(wfContentsIdsOfToDelete); 1992 } 1993 if(_psGetWfContentsIdsToNotDeleteQuery == null && ...) 1994 { 1995 } 1996 if(_psGetAssociatedDataDataIdsToDeleteQuery == null && ...) 1997 { 1998 } 1999 if(_psGetAssociatedDataDataIdsToNotDeleteQuery == null && ...) 2000 { 2001 } 2002 if(_psGetPortEventDataIdsToDeleteQuery == null && ...) 2003 { 2004 } 2005 if(_psGetPortEventDataIdsToNotDeleteQuery == null && ...) 2006 { 2007 } 2008 */ 2009 2010 if(_psChangeWorkflowName == null && _schema.containsTable("workflow")) 2011 { 2012 _psChangeWorkflowName = _dbType.getSQLUpdate("workflow", 2013 "name = ?", "lsid = ?"); 2014 } 2015 2016 if(_psChangeExecutionLSID == null && _schema.containsTable("workflow_exec")) 2017 { 2018 _psChangeExecutionLSID = _dbType.getSQLUpdate("workflow_exec", 2019 "lsid = ?", "id = ?"); 2020 } 2021 2022 if(_psChangeExecutionReferralList == null && 2023 _schema.containsTable("workflow_exec")) 2024 { 2025 _psChangeExecutionReferralList = _dbType.getSQLUpdate("workflow_exec", 2026 "derived_from = ?", "id = ?"); 2027 } 2028 2029 if(_psChangeExecutionType == null && 2030 _schema.containsTable("workflow_exec")) 2031 { 2032 _psChangeExecutionType = _dbType.getSQLUpdate("workflow_exec", 2033 "type = ?", "lsid = ?"); 2034 } 2035 2036 if(_psAssocDataInsert == null && 2037 _schema.containsTable("associated_data")) 2038 { 2039 _psAssocDataInsert = _dbType.getSQLInsert("associated_data", "id", 2040 "data_id, name, val, wf_exec_id", "?, ?, ?, ?"); 2041 } 2042 2043 if(_psAssocDataInsertNoDataId == null && 2044 _schema.containsTable("associated_data")) 2045 { 2046 _psAssocDataInsertNoDataId = _dbType.getSQLInsert("associated_data", "id", 2047 "name, val, wf_exec_id", "?, ?, ?"); 2048 } 2049 2050 if(_psErrorInsert == null && 2051 _schema.containsTable("error")) 2052 { 2053 _psErrorInsert = _dbType.getSQLInsert("error", "id", 2054 "entity_id, exec_id, message", "?, ?, ?"); 2055 } 2056 2057 // tagging 2058 2059 if(_psGetTagForExecIdAndURN == null && _schema.containsTable("tag")) 2060 { 2061 _psGetTagForExecIdAndURN = _dbType.getSQLSelect("tag", "urn", "wf_exec_id = ? and urn = ?"); 2062 } 2063 2064 if(_psTagInsert == null && _schema.containsTable("tag")) 2065 { 2066 _psTagInsert = _dbType.getSQLInsert("tag", "type, searchstring, urn, wf_exec_id", 2067 "?,?,?,?"); 2068 } 2069 2070 if(_psDeleteTagForExecIdAndURN == null && _schema.containsTable("tag")) 2071 { 2072 _psDeleteTagForExecIdAndURN = _dbType.getSQLDelete("tag", 2073 "wf_exec_id = ? and urn = ?"); 2074 } 2075 2076 //TODO use or create a getSQLxxxx ? 2077 /* 2078 if(_psDeleteExecutions == null && 2079 _schema.containsTable("workflow_exec")) 2080 { 2081 String wfExecStr = _dbType.getTableName("workflow_exec"); 2082 String queryStr = "DELETE FROM " + 2083 wfExecStr + " wfe " + 2084 "WHERE wfe.id IN (?)"; 2085 _psDeleteExecutions = _dbType.getPrepStatement(queryStr); 2086 } 2087 if(_psDeleteData == null){ 2088 2089 } 2090 */ 2091 2092 if(_psParameterExecInsert == null && 2093 _schema.containsTable("parameter_exec")) 2094 { 2095 _psParameterExecInsert = _dbType.getSQLInsert("parameter_exec", 2096 "parameter_id, wf_exec_id", "?, ?"); 2097 } 2098 2099 String wfStr = _dbType.getTableName("workflow"); 2100 String wfExecStr = _dbType.getTableName("workflow_exec"); 2101 2102 if(_psDeleteWorkflowsForNoExecutions == null) 2103 { 2104 String sqlStr = "DELETE FROM " + wfStr + " wf " + 2105 "WHERE wf.id NOT IN (SELECT wf_id FROM " + wfExecStr + ")"; 2106 _psDeleteWorkflowsForNoExecutions = _dbType.getPrepStatement(sqlStr); 2107 } 2108 2109 if(_psWorkflowIdsForNoExecutions == null) 2110 { 2111 String sqlStr = "SELECT wf.id FROM " + wfStr + " wf " + 2112 "WHERE wf.id NOT IN (SELECT wf_id FROM " + wfExecStr + ")"; 2113 _psWorkflowIdsForNoExecutions = _dbType.getPrepStatement(sqlStr); 2114 } 2115 2116 // create the remaining prepared statements. 2117 super._createPreparedStatements(); 2118 } 2119 2120 /** Create a Schema to reflect the v8 schema. */ 2121 @Override 2122 protected Schema _createSchema() 2123 { 2124 return Schemas.createSchemaV8(); 2125 } 2126 2127 /** Get the maximum size of token data that can be stored in the 2128 * database. 2129 */ 2130 @Override 2131 protected int _getMaxTokenDataSize() throws SQLException 2132 { 2133 return _dbType.getColumnSize("data", "contents"); 2134 } 2135 2136 /** Get the internal workflow id. */ 2137 @Override 2138 protected int _getWorkflowId() throws RecordingException 2139 { 2140 if(_wfLSID == null) 2141 { 2142 _wfLSID = NamedObjId.getIdFor(_recorderContainer); 2143 } 2144 2145 //System.out.println("WF LSID is " + _wfLSID); 2146 2147 // use the lsid string as the string id. 2148 return _getWorkflowId(_wfLSID.toStringWithoutRevision()); 2149 } 2150 2151 /** Find the container id of an entity. Always returns 2152 * RegEntity.UNKNOWN_ID since the V8 schema does not use the 2153 * container id. 2154 */ 2155 @Override 2156 protected int _getContainerId(Nameable namedObj) 2157 throws RecordingException 2158 { 2159 return RegEntity.UNKNOWN_ID; 2160 } 2161 2162 /** Set our prepared statements to null. */ 2163 @Override 2164 protected void _nullPreparedStatements() 2165 { 2166 super._nullPreparedStatements(); 2167 2168 _psDataInsert = null; 2169 _psDataMD5Query = null; 2170 _psPortEventInsert = null; 2171 _psPortEventInsertWithChecksum = null; 2172 _psChangeWorkflowName = null; 2173 _psChangeExecutionLSID = null; 2174 _psAssocDataInsert = null; 2175 _psAssocDataInsertNoDataId = null; 2176 _psErrorInsert = null; 2177 _psDeleteExecutions = null; 2178 _psDeleteData = null; 2179 _psGetWfContentsIdsToDeleteQuery = null; 2180 _psGetWfContentsIdsToNotDeleteQuery = null; 2181 _psGetAssociatedDataDataIdsToDeleteQuery = null; 2182 _psGetPortEventDataIdsToDeleteQuery = null; 2183 _psGetTagForExecIdAndURN = null; 2184 _psTagInsert = null; 2185 _psDeleteTagForExecIdAndURN = null; 2186 _psDeleteTags = null; 2187 _psParameterExecInsert = null; 2188 _psDeleteWorkflowsForNoExecutions = null; 2189 _psWorkflowIdsForNoExecutions = null; 2190 _psChangeExecutionType = null; 2191 } 2192 2193 /** Reconnect to the database. */ 2194 @Override 2195 protected void _reconnectDatabase(boolean resetDB) throws RecordingException 2196 { 2197 super._reconnectDatabase(resetDB); 2198 2199 try 2200 { 2201 _queryable = new SQLQueryV8(_dbType); 2202 } 2203 catch(QueryException e) 2204 { 2205 throw new RecordingException("Error creating queryable: ", e); 2206 } 2207 } 2208 2209 /** Re-acquire the current workflow id. */ 2210 @Override 2211 protected void _reconnectWorkflow() throws RecordingException 2212 { 2213 super._reconnectWorkflow(); 2214 2215 //_debug("reconnectWorkflow()"); 2216 //_debug(" wf name : " + _wfNameStr); 2217 //_debug(" _containerFullName: " + _containerFullName); 2218 2219 2220 // see if the workflow name is empty and the container has a name 2221 // (the latter means the workflow has been saved to a file). 2222 if(_wfNameStr.length() == 0 && 2223 _containerFullName.length() > 1 && !_containerFullName.equals( 2224 ".configuration.effigyFactory.Graph Editor.blank")) 2225 { 2226 // change the workflow name to the top level 2227 // composite's name. 2228 _changeWorkflowNameColumn(_recorderContainer.getName()); 2229 } 2230 } 2231 2232 /** Change the workflow.name column for the current workflow. */ 2233 protected void _changeWorkflowNameColumn(String newName) 2234 throws RecordingException 2235 { 2236 if(_wfLSID == null) 2237 { 2238 throw new RecordingException("No workflow LSID"); 2239 } 2240 2241 _changeWorkflowNameColumn(newName, _wfLSID); 2242 } 2243 2244 /** Change the workflow.name column for a specific LSID. */ 2245 protected void _changeWorkflowNameColumn(String newName, KeplerLSID lsid) 2246 throws RecordingException 2247 { 2248 //_debug(" changing wf name from " + _wfNameStr + " to " + newName); 2249 2250 // change the name in the workflow table 2251 try 2252 { 2253 _psChangeWorkflowName.setString(1, newName); 2254 _psChangeWorkflowName.setString(2, lsid.toStringWithoutRevision()); 2255 _psChangeWorkflowName.execute(); 2256 } 2257 catch(SQLException e) 2258 { 2259 throw new RecordingException("Unable to change workflow name:", e); 2260 } 2261 2262 if(_debugWriter != null) 2263 { 2264 _debugWrite("UPDATE workflow SET name = " + newName + " " + 2265 "WHERE lsid = " + lsid.toStringWithoutRevision()); 2266 } 2267 2268 _wfNameStr = newName; 2269 } 2270 2271 /** 2272 * Change execution LSID for an execution Id. Does not change and returns 2273 * false for attempts to change to an older or current revision, or if a 2274 * QueryException, or if no such execution Id. 2275 * 2276 * @param execId 2277 * @param newExecLSID 2278 * @return 2279 * @throws RecordingException 2280 */ 2281 @Override 2282 public boolean changeExecutionLSID(int execId, KeplerLSID newExecLSID, 2283 Queryable q) throws RecordingException 2284 { 2285 2286 // make sure we're connected to db 2287 _checkConnection(false); 2288 2289 KeplerLSID currentLSID; 2290 2291 try 2292 { 2293 currentLSID = q.getExecutionLSIDForExecution(execId); 2294 } 2295 catch (QueryException e1) 2296 { 2297 // TODO Auto-generated catch block 2298 e1.printStackTrace(); 2299 return false; 2300 } 2301 2302 if (currentLSID == null) 2303 { 2304 return false; 2305 } 2306 2307 if (currentLSID.equalsWithoutRevision(newExecLSID)) 2308 { 2309 // disallow changing to an older exec LSID 2310 if (currentLSID.getRevision() >= newExecLSID.getRevision()) 2311 { 2312 // _warn("Current executionLSID:"+ currentLSID + 2313 // " >= "+newExecLSID+" not changing."); 2314 return false; 2315 } 2316 } 2317 else 2318 { 2319 // TODO no need to do anything special for lsid 2320 // notLocalToInstance, is there? 2321 } 2322 2323 try 2324 { 2325 2326 _psChangeExecutionLSID.setString(1, newExecLSID.toString()); 2327 _psChangeExecutionLSID.setInt(2, execId); 2328 _psChangeExecutionLSID.execute(); 2329 2330 if (_debugWriter != null) 2331 { 2332 _debugWrite("UPDATE WORKFLOW_EXEC SET lsid = " + newExecLSID 2333 + " " + "WHERE id = lsid"); 2334 } 2335 return true; 2336 2337 } 2338 catch (SQLException e) 2339 { 2340 throw new RecordingException("Could not change execution lsid:", e); 2341 } 2342 } 2343 2344 /** 2345 * Change the execution type. 2346 * @param executionLSID 2347 * @param type 2348 * @return 2349 * @throws RecordingException 2350 */ 2351 public boolean changeExecutionType(KeplerLSID executionLSID, WorkflowRun.type type) throws RecordingException{ 2352 try 2353 { 2354 _psChangeExecutionType.setString(1, type.toString()); 2355 _psChangeExecutionType.setString(2, executionLSID.toString()); 2356 _psChangeExecutionType.execute(); 2357 2358 if (_debugWriter != null) 2359 { 2360 _debugWrite("UPDATE WORKFLOW_EXEC SET type = " + type.toString() 2361 + " " + "WHERE lsid = lsid"); 2362 } 2363 return true; 2364 2365 } 2366 catch (SQLException e) 2367 { 2368 throw new RecordingException("Could not change execution type:", e); 2369 2370 } 2371 } 2372 2373 /** Add a port to the port table. */ 2374 @Override 2375 protected void _regPortReal(TypedIOPort port, RegEntity re) 2376 throws RecordingException 2377 { 2378 boolean multi = port.isMultiport(); 2379 2380 int direction; 2381 if(port.isInput() && port.isOutput()) 2382 { 2383 direction = PortDirection.InOut.ordinal(); 2384 } 2385 else if(port.isInput()) 2386 { 2387 direction = PortDirection.In.ordinal(); 2388 } 2389 else 2390 { 2391 direction = PortDirection.Out.ordinal(); 2392 } 2393 2394 try 2395 { 2396 synchronized(_psPortInsert) 2397 { 2398 _psPortInsert.setInt(1, re.getId()); 2399 _psPortInsert.setInt(2, direction); 2400 _psPortInsert.setBoolean(3, multi); 2401 _psPortInsert.executeUpdate(); 2402 } 2403 2404 if(_debugWriter != null) 2405 { 2406 _debugWrite("INSERT INTO PORT (id, " + direction + 2407 ", " + multi + ")"); 2408 } 2409 2410 } 2411 catch(SQLException e) 2412 { 2413 throw new RecordingException(_getExceptionMessage(e)); 2414 } 2415 } 2416 2417 2418 /** Record a specific type of firing for an actor. */ 2419 @Override 2420 protected void _recordFiringEvent(Actor actor, 2421 FiringEvent.FiringEventType type, Date timestamp) throws RecordingException 2422 { 2423 /* 2424 System.out.println("firing event: " + type + 2425 " for " + _getNameableFullName(actor) + 2426 " at " + timestamp); 2427 */ 2428 2429 // see if it's a start or stop. 2430 FireState<Integer> fs = _fireStateTable.get(actor); 2431 2432 // make sure the fire state exists. 2433 if(fs == null) 2434 { 2435 throw new RecordingException( 2436 "Received actor fire event for unregistered actor: " + 2437 _getNameableFullName(actor)); 2438 } 2439 2440 synchronized(fs) 2441 { 2442 // get the last type of firing start 2443 FiringEvent.FiringEventType lastStartType = 2444 fs.getLastStartFireType(); 2445 2446 // see if current firing is new iteration: 2447 // NOTE: PN does not report iterate firings so the iteration 2448 // may begin with prefire if the last type of firing was not 2449 // iterate. 2450 if(type == FiringEvent.BEFORE_ITERATE || 2451 (type == FiringEvent.BEFORE_PREFIRE && 2452 lastStartType != FiringEvent.BEFORE_ITERATE)) 2453 { 2454 int fireId; 2455 2456 //_debug("start firing: " + type + " for " + _getNameableFullName(actor)); 2457 2458 try 2459 { 2460 synchronized(_psActorFireStart) 2461 { 2462 _psActorFireStart.setInt(1, fs.getActorId()); 2463 _psActorFireStart.setInt(2, _wfExecId); 2464 _psActorFireStart.setTimestamp(3, new Timestamp(timestamp.getTime())); 2465 fireId = _dbType.insert(_psActorFireStart, 2466 "actor_fire", "id"); 2467 } 2468 2469 } 2470 catch(SQLException e) 2471 { 2472 String message = "Unable to record start of actor " + 2473 "firing: " + e.getMessage(); 2474 _debugWrite(message); 2475 throw new RecordingException(message); 2476 } 2477 2478 /* 2479 System.out.println("started firing " + fireId + ": " + 2480 " for " + _getNameableFullName(actor) + 2481 " at " + timestamp); 2482 */ 2483 2484 if(_debugWriter != null) 2485 { 2486 _debugWrite("INSERT INTO ACTOR_FIRE(" + 2487 fs.getActorId() + ", " + _wfExecId + ", " + 2488 "curTime)"); 2489 } 2490 2491 fs.fireStart(type, fireId); 2492 } 2493 // see if current firing is end of iteration: 2494 else if(type == FiringEvent.AFTER_ITERATE || 2495 (type == FiringEvent.AFTER_POSTFIRE && 2496 lastStartType == FiringEvent.BEFORE_PREFIRE)) 2497 { 2498 //_debug("end firing: " + type + " for " + _getNameableFullName(actor)); 2499 2500 Integer fireIdInteger; 2501 // NOTE: if the type is a AFTER_POSTFIRE and last start 2502 // type is BEFORE_PREFIRE, we are running in PN. 2503 // in this case, tell the fireState to stop firing using 2504 // AFTER_PREFIRE instead of AFTER_POSTFIRE, since we never 2505 // told the fireState about the BEFORE_POSTFIRE. 2506 if(type == FiringEvent.AFTER_POSTFIRE) 2507 { 2508 fireIdInteger = fs.fireStop(FiringEvent.AFTER_PREFIRE); 2509 } 2510 else 2511 { 2512 fireIdInteger = fs.fireStop(type); 2513 } 2514 2515 int fireId = RegEntity.UNKNOWN_ID; 2516 if(fireIdInteger != null) 2517 { 2518 fireId = fireIdInteger; 2519 } 2520 2521 int tries = 2; 2522 SQLException exception = null; 2523 while(tries > 0) 2524 { 2525 synchronized(_psActorFireStop) 2526 { 2527 try 2528 { 2529 tries--; 2530 exception = null; 2531 _recordActorFireStop(fireId, actor, timestamp); 2532 break; 2533 } 2534 catch(SQLException e) 2535 { 2536 exception = e; 2537 } 2538 } 2539 2540 if(exception != null && tries > 0) 2541 { 2542 _reconnectDatabase(false); 2543 } 2544 } 2545 2546 if(exception != null) 2547 { 2548 throw new RecordingException("Unable to record " + 2549 "end of actor firing: ", exception); 2550 } 2551 } 2552 /* 2553 else 2554 { 2555 _debug("ignoring firing: " + type + " for " + 2556 _getNameableFullName(actor)); 2557 } 2558 */ 2559 } 2560 } 2561 2562 /** Record the end of an actor fire and attempt to serialize the actor's 2563 * state. 2564 */ 2565 protected void _recordActorFireStop(int fireId, Actor actor, Date timestamp) throws SQLException 2566 { 2567 2568 /* 2569 System.out.println("stop firing " + fireId + ": " + 2570 " for " + _getNameableFullName(actor) + 2571 " at " + timestamp); 2572 */ 2573 2574 synchronized(_psActorFireStop) 2575 { 2576 _psActorFireStop.setTimestamp(1, new Timestamp(timestamp.getTime())); 2577 _psActorFireStop.setInt(2, fireId); 2578 _psActorFireStop.executeUpdate(); 2579 } 2580 2581 if(_debugWriter != null) 2582 { 2583 try { 2584 _debugWrite("UPDATE ACTOR_FIRE set end_time " + 2585 "= curTime WHERE id = " + fireId + ")"); 2586 } catch (RecordingException e) { 2587 System.out.println("Error writing to debug writer: " + e.getMessage()); 2588 } 2589 } 2590 2591 if(_stateSerializer != null) 2592 { 2593 byte[] state = _stateSerializer.serializeActor(actor); 2594 if(state != null) 2595 { 2596 /* XXX todo once state table is created 2597 PreparedStatement _psActorStateInsert = _dbType.getSQLInsert("actor_state", "id, state", "?, ?"); 2598 synchronized(_psActorStateInsert) 2599 { 2600 _psActorStateInsert.setInt(1, fireId); 2601 _psActorStateInsert.setBytes(2, state); 2602 _psActorStateInsert.executeUpdate(); 2603 } 2604 */ 2605 2606 if(_debugWriter != null) 2607 { 2608 try { 2609 _debugWrite("INSERT INTO ACTOR_STATE(" + fireId + ", " + String.valueOf(state) + ")"); 2610 } catch (RecordingException e) { 2611 System.out.println("Error writing to debug writer: " + e.getMessage()); 2612 } 2613 } 2614 } 2615 } 2616 } 2617 2618 /** Record a specific type of firing for an actor. */ 2619 @Override 2620 protected void _recordPortEvent(IOPort port, int fireId, int rwfireId, 2621 boolean isRead, int channel, Token token, IOPort destPort, Date timestamp) 2622 throws RecordingException 2623 { 2624 //if(isRead) _debug("port event read " + _getNameableFullName(port)); 2625 //else _debug("port event write " + _getNameableFullName(port)); 2626 2627 RegEntity.EntityType type; 2628 2629 if(port instanceof ParameterPort) 2630 { 2631 type = RegEntity.EntityType.PortParameter; 2632 } 2633 else 2634 { 2635 type = RegEntity.EntityType.Port; 2636 } 2637 2638 RegEntity re = _checkEntity(port, type); 2639 2640 if(re == null) 2641 { 2642 throw new RecordingException("Port has not been registered: " + 2643 _getNameableFullName(port)); 2644 } 2645 2646 // an actor may read or write a port in a non-fire method, e.g., 2647 // SampleDelay. in this case, generate fake before and after fire 2648 // events since the firing id is a foreign key. 2649 boolean fakeFired = false; 2650 if(fireId == RegEntity.UNKNOWN_ID) 2651 { 2652 fakeFired = true; 2653 Actor actor = (Actor)port.getContainer(); 2654 Director director = actor.getDirector(); 2655 actorFire(new FiringEvent(director, actor, FiringEvent.BEFORE_ITERATE), 2656 timestamp); 2657 2658 // now get the firing id 2659 FireState<Integer> fs = _fireStateTable.get(actor); 2660 fireId = fs.getCurFireId(); 2661 2662 // XXX print a warning if the actor is not sampledelay 2663 if(! (actor instanceof SampleDelay)) 2664 { 2665 _warn("actor port event in non-fire method: " + actor.getClass()); 2666 } 2667 2668 } 2669 2670 // get the port id 2671 int portId = re.getId(); 2672 2673 String dataMD5Str = null; 2674 String fileMD5Str = ""; 2675 String typeStr = null; 2676 String tokenValue = null; 2677 2678 int writeId = -1; 2679 2680 // if this is a read event, retrieve the write even id that 2681 // produced this token 2682 if(isRead) 2683 { 2684 TokenInfo tokenInfo = _portConnector.getNextId(port, channel); 2685 writeId = tokenInfo.writeId; 2686 // NOTE: these are not necessary for reads since they are 2687 // in the row for the write event. 2688 //dataMD5Str = tokenInfo.dataMD5; 2689 //fileMD5Str = tokenInfo.fileMD5; 2690 //tokenValue = tokenInfo.tokenValue; 2691 //typeStr = token.getClass().getName(); 2692 } 2693 else 2694 { 2695 typeStr = token.getClass().getName(); 2696 2697 tokenValue = token.toString(); 2698 2699 // only checksum the data is larger than port_event.data column 2700 if(tokenValue.length() > MAX_PORT_EVENT_DATA_LENGTH) { 2701 dataMD5Str = _checkData(token.toString().getBytes()); 2702 } 2703 2704 if(_maxFileIncludeSizeKBVal > 0) 2705 { 2706 fileMD5Str = _checkForFileData(token); 2707 } 2708 } 2709 2710 // insert the new row in port_event table. 2711 try 2712 { 2713 if(dataMD5Str == null) { 2714 synchronized(_psPortEventInsert) 2715 { 2716 _psPortEventInsert.setInt(1, channel); 2717 _psPortEventInsert.setString(2, tokenValue); 2718 _psPortEventInsert.setInt(3, fireId); 2719 _psPortEventInsert.setString(4, fileMD5Str); 2720 _psPortEventInsert.setInt(5, portId); 2721 _psPortEventInsert.setTimestamp(6, new Timestamp(timestamp.getTime())); 2722 _psPortEventInsert.setString(7, typeStr); 2723 _psPortEventInsert.setInt(8, writeId); 2724 2725 // save the value we wrote into the write_event_id column 2726 // for debug writer. 2727 int writeEventId = writeId; 2728 2729 writeId = _dbType.insert(_psPortEventInsert, "port_event", "id"); 2730 2731 if(_debugWriter != null) 2732 { 2733 _debugWrite("INSERT INTO PORT_EVENT(" + 2734 channel + ", " + 2735 tokenValue + ", " + 2736 fireId + ", " + 2737 fileMD5Str + ", " + 2738 portId + ", " + 2739 "curTime, " + 2740 typeStr + ", " + 2741 writeEventId + ")"); 2742 } 2743 } 2744 2745 } else { 2746 synchronized(_psPortEventInsertWithChecksum) 2747 { 2748 _psPortEventInsertWithChecksum.setInt(1, channel); 2749 _psPortEventInsertWithChecksum.setString(2, dataMD5Str); 2750 _psPortEventInsertWithChecksum.setInt(3, fireId); 2751 _psPortEventInsertWithChecksum.setString(4, fileMD5Str); 2752 _psPortEventInsertWithChecksum.setInt(5, portId); 2753 _psPortEventInsertWithChecksum.setTimestamp(6, new Timestamp(timestamp.getTime())); 2754 _psPortEventInsertWithChecksum.setString(7, typeStr); 2755 _psPortEventInsertWithChecksum.setInt(8, writeId); 2756 2757 // save the value we wrote into the write_event_id column 2758 // for debug writer. 2759 int writeEventId = writeId; 2760 2761 writeId = _dbType.insert(_psPortEventInsertWithChecksum, "port_event", 2762 "id"); 2763 2764 if(_debugWriter != null) 2765 { 2766 _debugWrite("INSERT INTO PORT_EVENT(" + 2767 channel + ", " + 2768 dataMD5Str + ", " + 2769 fireId + ", " + 2770 fileMD5Str + ", " + 2771 portId + ", " + 2772 "curTime, " + 2773 typeStr + ", " + 2774 writeEventId + ")"); 2775 } 2776 } 2777 } 2778 2779 // if this is a write event, update the queues for each 2780 // of the receiving ports 2781 if(!isRead) 2782 { 2783 //_debug("updating any queues connected to " + _getNameableFullName(port)); 2784 2785 TokenInfo tokenInfo = new TokenInfo(writeId, dataMD5Str, 2786 fileMD5Str, tokenValue); 2787 2788 if(destPort == null) 2789 { 2790 _portConnector.sendIdToConnections(port, channel, 2791 tokenInfo); 2792 } 2793 else 2794 { 2795 _portConnector.receiveId(destPort, channel, tokenInfo); 2796 } 2797 } 2798 } 2799 catch(SQLException e) 2800 { 2801 throw new RecordingException(_getExceptionMessage(e)); 2802 } 2803 2804 if(fakeFired) 2805 { 2806 Actor actor = (Actor)port.getContainer(); 2807 Director director = actor.getDirector(); 2808 actorFire(new FiringEvent(director, actor, FiringEvent.AFTER_ITERATE)); 2809 } 2810 } 2811 2812 /** Adds data to the data table if not already there. 2813 * @return the md5 of the data 2814 */ 2815 protected String _checkData(byte[] data) throws RecordingException 2816 { 2817 boolean truncated = false; 2818 2819 // see if data is too large 2820 if(data.length > _maxDataSize) 2821 { 2822 truncated = true; 2823 } 2824 2825 // compute checksum 2826 String retval = _getMD5(data); 2827 2828 boolean found = false; 2829 2830 try 2831 { 2832 // query table for checksum 2833 synchronized(_psDataMD5Query) 2834 { 2835 _psDataMD5Query.setString(1, retval); 2836 ResultSet result = _psDataMD5Query.executeQuery(); 2837 2838 if(result.next()) 2839 { 2840 found = true; 2841 } 2842 2843 result.close(); 2844 } 2845 } 2846 catch(SQLException e) 2847 { 2848 throw new RecordingException("Unable to query data table: " + 2849 e.getMessage()); 2850 } 2851 2852 // if not found insert 2853 if(!found) 2854 { 2855 try 2856 { 2857 synchronized(_psDataInsert) 2858 { 2859 _psDataInsert.setBytes(1, data); 2860 _psDataInsert.setString(2, retval); 2861 _psDataInsert.setBoolean(3, truncated); 2862 _psDataInsert.executeUpdate(); 2863 } 2864 2865 if(_debugWriter != null) 2866 { 2867 // both data and md5 can change depending on time, so don't output. 2868 _debugWrite("INSERT INTO DATA(" + 2869 "data, " + 2870 "md5, " + 2871 truncated + ")"); 2872 } 2873 } 2874 catch(SQLException e) 2875 { 2876 throw new RecordingException("Unable to insert into data " + 2877 "table: " + e.getMessage()); 2878 } 2879 } 2880 return retval; 2881 } 2882 2883 /** Adds data to the data table if not already there. 2884 * @return the md5 of the data 2885 */ 2886 protected String _checkData(File file) throws RecordingException 2887 { 2888 String retval = ""; 2889 int fileBytes = Long.valueOf(file.length()).intValue(); 2890 FileInputStream stream = null; 2891 2892 try 2893 { 2894 try 2895 { 2896 stream = new FileInputStream(file); 2897 } 2898 catch(FileNotFoundException e) 2899 { 2900 throw new RecordingException("File " + file.getAbsolutePath() + 2901 " not found: ", e); 2902 } 2903 2904 byte[] data = new byte[fileBytes]; 2905 2906 try 2907 { 2908 if(stream.read(data) != fileBytes) 2909 { 2910 throw new RecordingException("Unable to read entire " + 2911 "file: " + file.getAbsolutePath()); 2912 } 2913 } 2914 catch(IOException e) 2915 { 2916 throw new RecordingException("Error reading file " + 2917 file.getAbsolutePath() + ": ", e); 2918 } 2919 2920 retval = _checkData(data); 2921 } 2922 finally 2923 { 2924 if(stream != null) 2925 { 2926 try 2927 { 2928 stream.close(); 2929 } 2930 catch(IOException e) 2931 { 2932 throw new RecordingException("Error closing file " + 2933 file.getAbsolutePath() + ": ", e); 2934 } 2935 } 2936 } 2937 2938 return retval; 2939 } 2940 2941 /** See if a token contains a valid filename. 2942 * @return If token contains a valid filename, and its size is less than 2943 * minFileIncludeKB, then returns the MD5 of the file's contents. 2944 */ 2945 protected String _checkForFileData(Token token) throws RecordingException 2946 { 2947 String retval = ""; 2948 2949 ArrayToken arrayToken = null; 2950 RecordToken recordToken = null; 2951 2952 if(token instanceof StringToken) 2953 { 2954 String possibleFilePath = ((StringToken)token).stringValue(); 2955 File possibleFile = new File(possibleFilePath); 2956 2957 //_debug("seeing if file: " + possibleFile.getAbsolutePath()); 2958 2959 int fileBytes = Long.valueOf(possibleFile.length()).intValue(); 2960 float fileKiloBytes = 0; 2961 if(fileBytes > 1023) 2962 { 2963 fileKiloBytes = Float.valueOf(fileBytes) / 1024; 2964 } 2965 2966 // see if file is a file (not a directory), is readable, 2967 // and is less than the max size. 2968 if(possibleFile.isFile() && possibleFile.canRead() && 2969 fileKiloBytes <= _maxFileIncludeSizeKBVal) 2970 { 2971 /* 2972 _debug(" file is readable and under max (size = " + 2973 fileKiloBytes + " max = " + _maxFileIncludeSizeKBVal + 2974 "); going to include"); 2975 */ 2976 2977 retval = _checkData(possibleFile); 2978 } 2979 } 2980 //if ArrayToken or RecordToken recurse to check elements 2981 else if (token instanceof ArrayToken){ 2982 arrayToken = ((ArrayToken)token); 2983 for (int i=0; i<arrayToken.length(); i++){ 2984 final Token aToken = arrayToken.getElement(i); 2985 if (!aToken.isNil()){ 2986 _checkForFileData(aToken); 2987 } 2988 } 2989 } 2990 else if (token instanceof RecordToken){ 2991 recordToken = ((RecordToken)token); 2992 Iterator<?> recordTokenItr = recordToken.labelSet().iterator(); 2993 while (recordTokenItr.hasNext()){ 2994 final String label = (String)recordTokenItr.next(); 2995 final Token aToken = recordToken.get(label); 2996 if (!aToken.isNil()){ 2997 _checkForFileData(aToken); 2998 } 2999 } 3000 } 3001 3002 return retval; 3003 } 3004 3005 /** Reset when we use a different db connection. */ 3006 @Override 3007 protected void _dbReset() throws RecordingException 3008 { 3009 _wfLSID = _containerLSID; 3010 super._dbReset(); 3011 } 3012 3013 /** Reset when we use a different workflow. */ 3014 @Override 3015 protected void _wfReset() 3016 { 3017 _wfLastExecId = RegEntity.UNKNOWN_ID; 3018 super._wfReset(); 3019 } 3020 3021 /** Add all the parameter ids to the parameter_exec table. */ 3022 protected void _updateParameterExecTable() throws RecordingException 3023 { 3024 // XXX synchronize access to _entityCacheTable 3025 for(Map.Entry<Nameable,RegEntity> entry : _entityCacheTable.entrySet()) 3026 { 3027 Nameable nameable = entry.getKey(); 3028 RegEntity entity = entry.getValue(); 3029 RegEntity.EntityType type = entity.getType(); 3030 3031 // add to parameter exec table if a parameter or unconnected 3032 // port parameter. don't add connected port parameters since 3033 // their values come from the port. 3034 if(type == RegEntity.EntityType.Parameter || 3035 ((nameable instanceof PortParameter) && 3036 ((PortParameter)nameable).getPort().numberOfSources() == 0)) 3037 { 3038 try 3039 { 3040 _psParameterExecInsert.setInt(1, entity.getId()); 3041 _psParameterExecInsert.setInt(2, _wfExecId); 3042 3043 _psParameterExecInsert.executeUpdate(); 3044 3045 if(_debugWriter != null) 3046 { 3047 // both data and md5 can change depending on time, so don't output. 3048 /* XXX 3049 _debugWrite("INSERT INTO PARAMETER_EXEC(" + 3050 entity.getId() + ", " + _wfExecId + ")"); 3051 */ 3052 } 3053 } 3054 catch(SQLException e) 3055 { 3056 throw new RecordingException("Error updating parameter_exec table: ", e); 3057 } 3058 } 3059 } 3060 } 3061 3062 //////////////////////////////////////////////////////////////////////// 3063 //// protected classes //// 3064 3065 /** The class used in the port queues to contain the port event write id 3066 * and data md5. 3067 */ 3068 protected static class TokenInfo 3069 { 3070 /** Construct a new PortQueueEntry for a write id and md5 of the 3071 * data. 3072 */ 3073 TokenInfo(int writeId, String dataMD5, String fileMD5, String tokenValue) 3074 { 3075 this.writeId = writeId; 3076 this.dataMD5 = dataMD5; 3077 this.fileMD5 = fileMD5; 3078 this.tokenValue = tokenValue; 3079 } 3080 3081 /** Get a string representation. */ 3082 @Override 3083 public String toString() 3084 { 3085 return "writeId = " + writeId; // + " data md5 = " + _dataMD5; 3086 } 3087 3088 /** The MD5 checksum. */ 3089 public String dataMD5; 3090 3091 /** The file MD5 checksum. */ 3092 public String fileMD5; 3093 3094 /** The port event write id. */ 3095 public int writeId; 3096 3097 public String tokenValue; 3098 } 3099 3100 /** A subclass of SQLRecordingParameters that adds more parameters. */ 3101 protected static class SQLRecordingParametersV8 3102 extends SQLRecordingParameters 3103 { 3104 /** Construct a new SQLRecordParametersV8. */ 3105 protected SQLRecordingParametersV8(NamedObj no) 3106 throws IllegalActionException, NameDuplicationException 3107 { 3108 super(no); 3109 addIntParameter(maxFileIncludeSizeKBStr, _maxFileIncludeSizeKBDefaultVal); 3110 addStringParameter(execAnnotationStr); 3111 addStringParameter(nextExecLSIDStr); 3112 addBooleanParameter(watchForLSIDChangesStr, true); 3113 } 3114 3115 /** Name of parameter that specifies maximum size, in KB, of file 3116 * to include in provenance. 3117 */ 3118 protected static final String maxFileIncludeSizeKBStr = 3119 "maxFileInclusionSizeKB"; 3120 3121 /** Name of execution annotation parameter. */ 3122 protected static final String execAnnotationStr = "execAnnotation"; 3123 3124 /** Name of next execution LSID parameter. */ 3125 protected static final String nextExecLSIDStr = "nextExecLSID"; 3126 3127 /** Name of watch for LSID changes parameter. */ 3128 protected static final String watchForLSIDChangesStr = "watchForLSIDChanges"; 3129 } 3130 3131 //////////////////////////////////////////////////////////////////////// 3132 //// protected variables //// 3133 3134 /** Class for token dependencies. */ 3135 protected PortConnector<TokenInfo> _portConnector; 3136 3137 // prepared statements 3138 protected PreparedStatement _psDataInsert; 3139 protected PreparedStatement _psDataMD5Query; 3140 protected PreparedStatement _psPortEventInsert; 3141 protected PreparedStatement _psPortEventInsertWithChecksum; 3142 protected PreparedStatement _psChangeWorkflowName; 3143 protected PreparedStatement _psChangeExecutionLSID; 3144 protected PreparedStatement _psChangeExecutionReferralList; 3145 protected PreparedStatement _psChangeExecutionType; 3146 protected PreparedStatement _psAssocDataInsert; 3147 protected PreparedStatement _psAssocDataInsertNoDataId; 3148 protected PreparedStatement _psErrorInsert; 3149 protected PreparedStatement _psDeleteExecutions; 3150 protected PreparedStatement _psDeleteData; 3151 protected PreparedStatement _psGetWfContentsIdsToDeleteQuery; 3152 protected PreparedStatement _psGetWfContentsIdsToNotDeleteQuery; 3153 protected PreparedStatement _psGetAssociatedDataDataIdsToDeleteQuery; 3154 protected PreparedStatement _psGetAssociatedDataDataIdsToNotDeleteQuery; 3155 protected PreparedStatement _psGetPortEventDataIdsToDeleteQuery; 3156 protected PreparedStatement _psGetPortEventDataIdsToNotDeleteQuery; 3157 protected PreparedStatement _psGetTagForExecIdAndURN; 3158 protected PreparedStatement _psTagInsert; 3159 protected PreparedStatement _psDeleteTagForExecIdAndURN; 3160 protected PreparedStatement _psDeleteTags; 3161 protected PreparedStatement _psParameterExecInsert; 3162 protected PreparedStatement _psDeleteWorkflowsForNoExecutions; 3163 protected PreparedStatement _psWorkflowIdsForNoExecutions; 3164 3165 /** Queryable connection. */ 3166 protected Queryable _queryable; 3167 3168 /** The most recently completed workflow execution id. */ 3169 protected int _wfLastExecId; 3170 3171 /** Value of execution annotation parameter. */ 3172 protected String _execAnnotation; 3173 3174 /** Value of next execution LSID parameter. */ 3175 protected String _nextExecLSIDStr; 3176 3177 /** The current workflow id. */ 3178 protected KeplerLSID _wfLSID; 3179 3180 /** True if executionError is called */ 3181 protected boolean _executionHadAnError; 3182 3183 protected Map<KeplerLSID, Integer> _executionLSIDtoIdMap; 3184 3185 private static final int numMillisecondsInASecond = 1000; 3186 3187 //////////////////////////////////////////////////////////////////////// 3188 //// private methods //// 3189 3190 /** Compute an MD5 checksum from a byte array. */ 3191 private String _getMD5(byte[] data) throws RecordingException 3192 { 3193 String retval = null; 3194 try 3195 { 3196 MessageDigest digest = MessageDigest.getInstance("MD5"); 3197 digest.update(data); 3198 byte[] md5sum = digest.digest(); 3199 3200 //XXX necessary? 3201 BigInteger bigInt = new BigInteger(1, md5sum); 3202 retval = bigInt.toString(16); 3203 3204 // make sure it's 32 chars 3205 while(retval.length() < 32) 3206 { 3207 retval = "0" + retval; 3208 } 3209 } 3210 catch (NoSuchAlgorithmException e) 3211 { 3212 throw new RecordingException("MD5 digest does not exist."); 3213 } 3214 return retval; 3215 } 3216 3217 //////////////////////////////////////////////////////////////////////// 3218 //// private variables //// 3219 3220 /** Maximum size, in kilobytes, of files to include in provenance. */ 3221 private int _maxFileIncludeSizeKBVal; 3222 3223 /** The default value for maximum size of file for inclusion. */ 3224 private final static int _maxFileIncludeSizeKBDefaultVal = 1024; 3225 3226 /** A synchronized collection of instances of this class. */ 3227 private static Set<WeakReference<SQLRecordingV8>> _v8Set = 3228 Collections.synchronizedSet(new HashSet<WeakReference<SQLRecordingV8>>()); 3229 3230 /** If true, when the workflow LSID changes, reconnect to workflow. */ 3231 private boolean _watchForLSIDChanges; 3232 3233 /** The maximum length of the data stored in the port_event table. 3234 * Larger data is stored in the data table. 3235 */ 3236 public final static int MAX_PORT_EVENT_DATA_LENGTH = 4096; 3237 3238}