001/** 002 * '$Author: barseghian $' 003 * '$Date: 2013-01-29 23:22:22 +0000 (Tue, 29 Jan 2013) $' 004 * '$Revision: 31389 $' 005 * 006 * For Details: 007 * http://www.kepler-project.org 008 * 009 * Copyright (c) 2009-2010 The Regents of the 010 * University of California. All rights reserved. Permission is hereby granted, 011 * without written agreement and without license or royalty fees, to use, copy, 012 * modify, and distribute this software and its documentation for any purpose, 013 * provided that the above copyright notice and the following two paragraphs 014 * appear in all copies of this software. IN NO EVENT SHALL THE UNIVERSITY OF 015 * CALIFORNIA BE LIABLE TO ANY PARTY FOR DIRECT, INDIRECT, SPECIAL, INCIDENTAL, 016 * OR CONSEQUENTIAL DAMAGES ARISING OUT OF THE USE OF THIS SOFTWARE AND ITS 017 * DOCUMENTATION, EVEN IF THE UNIVERSITY OF CALIFORNIA HAS BEEN ADVISED OF THE 018 * POSSIBILITY OF SUCH DAMAGE. THE UNIVERSITY OF CALIFORNIA SPECIFICALLY 019 * DISCLAIMS ANY WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED 020 * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE. THE 021 * SOFTWARE PROVIDED HEREUNDER IS ON AN "AS IS" BASIS, AND THE UNIVERSITY OF 022 * CALIFORNIA HAS NO OBLIGATION TO PROVIDE MAINTENANCE, SUPPORT, UPDATES, 023 * ENHANCEMENTS, OR MODIFICATIONS. 024 */ 025 026package org.kepler.workflowrunmanager; 027 028import java.io.File; 029import java.io.IOException; 030import java.io.InputStream; 031import java.lang.ref.WeakReference; 032import java.sql.Timestamp; 033import java.util.ArrayList; 034import java.util.Date; 035import java.util.Iterator; 036import java.util.LinkedList; 037import java.util.List; 038import java.util.Map; 039import java.util.Vector; 040 041import org.apache.commons.logging.Log; 042import org.apache.commons.logging.LogFactory; 043import org.kepler.authentication.AuthenticationException; 044import org.kepler.build.modules.Module; 045import org.kepler.configuration.ConfigurationManager; 046import org.kepler.configuration.ConfigurationUtilities; 047import org.kepler.kar.KAREntry; 048import org.kepler.kar.KARFile; 049import org.kepler.objectmanager.ObjectManager; 050import org.kepler.objectmanager.cache.CacheException; 051import org.kepler.objectmanager.cache.CacheManager; 052import org.kepler.objectmanager.lsid.KeplerLSID; 053import org.kepler.objectmanager.repository.KARDownloader; 054import org.kepler.provenance.ProvenanceRecorder; 055import org.kepler.provenance.QueryException; 056import org.kepler.provenance.Queryable; 057import org.kepler.provenance.Recording; 058import org.kepler.provenance.RecordingException; 059import org.kepler.provenance.sql.SQLQueryV8; 060import org.kepler.provenance.sql.SQLRecordingV8; 061import org.kepler.sms.NamedOntClass; 062import org.kepler.util.DotKeplerManager; 063import org.kepler.util.FileUtil; 064import org.kepler.util.ProvenanceStore; 065import org.kepler.util.RemoteStoreRunsTracker; 066import org.kepler.util.WorkflowRun; 067import org.kepler.util.WorkflowRunBuilder; 068import org.kepler.util.WorkflowRunUtil; 069 070import ptolemy.kernel.ComponentEntity; 071import ptolemy.kernel.util.IllegalActionException; 072import ptolemy.kernel.util.NameDuplicationException; 073import ptolemy.kernel.util.NamedObj; 074import ptolemy.moml.MoMLParser; 075 076public class WorkflowRunManager implements WorkflowRunManagerEventListener { 077 078 private static final Log log = LogFactory.getLog(WorkflowRunManager.class 079 .getName()); 080 private static final boolean isDebugging = log.isDebugEnabled(); 081 private static final int NUM_MILLISECONDS_IN_DAY = 86400000; 082 private static final int ONE_MILLISECOND = 1; 083 private static final String LESS_THAN = "<"; 084 private static final String GREATER_THAN = ">"; 085 private static final String PERCENTAGE = "%"; 086 087 private ArrayList<WorkflowRun> _allWorkflowRuns = new ArrayList<WorkflowRun>(); 088 private ArrayList<WeakReference<WorkflowRunManagerEventListener>> _listeners = 089 new ArrayList<WeakReference<WorkflowRunManagerEventListener>>(); 090 private Queryable _queryable = null; 091 private boolean _connected = false; 092 private boolean _usingSupportedRecordingType = true; 093 094 private Vector<KeplerLSID> _selectedRunsForSave = new Vector<KeplerLSID>(); 095 096 private WorkflowRunManagerRecording _workflowRunManagerRecording; 097 098 // provenanceStore associated w/ this WRM. 099 private ProvenanceStore _provenanceStore = null; 100 101 private ProvenanceRecorder _provenanceRecorder = null; 102 103 104 /** 105 * Constructor. 106 * @param provenanceStore 107 */ 108 protected WorkflowRunManager(ProvenanceStore provenanceStore) { 109 110 _provenanceStore = provenanceStore; 111 112 try { 113 //System.out.println("WorkflowRunManager constructor. creating _provenanceRecorder using " 114 // +_provenanceStore.getProvenanceConfigurationProperty().getName()); 115 _provenanceRecorder = new ProvenanceRecorder( 116 _provenanceStore.getProvenanceConfigurationProperty()); 117 } catch (IllegalActionException e) { 118 // TODO Auto-generated catch block 119 e.printStackTrace(); 120 } catch (NameDuplicationException e) { 121 // TODO Auto-generated catch block 122 e.printStackTrace(); 123 } 124 125 addWorkflowRunManagerEventListener(this); 126 127 try { 128 // create the recording and add to prov recorder's future 129 // piggybacks. 130 _workflowRunManagerRecording = new WorkflowRunManagerRecording(this); 131 ProvenanceRecorder.addPiggybackForFuture(_workflowRunManagerRecording); 132 } catch (RecordingException e) { 133 System.out.println("Error creating recording: " + e.getMessage()); 134 } 135 136 } 137 138 /** 139 * connect to provenance store via setting queryable to new SQLQueryV8 140 */ 141 public void connect() { 142 143 if (!_connected) { 144 try { 145 146 Map<String, String> pairsMap = ConfigurationUtilities.getPairsMap( 147 _provenanceStore.getProvenanceConfigurationProperty()); 148 String recordingType = pairsMap.get("Recording Type"); 149 150 if (!recordingType.equals("SQL-SPA-v8")){ 151 System.out.println("ERROR. WorkflowRunManager.connect unsupported recordingType:"+recordingType); 152 _connected = false; 153 _usingSupportedRecordingType = false; 154 } 155 else{ 156 _queryable = new SQLQueryV8(ConfigurationUtilities.getPairsMap( 157 _provenanceStore.getProvenanceConfigurationProperty())); 158 _connected = true; 159 _usingSupportedRecordingType = true; 160 } 161 } catch (QueryException e) { 162 _connected = false; 163 e.printStackTrace(); 164 } 165 } 166 } 167 168 public boolean isConnected() { 169 return _connected; 170 } 171 172 public void disconnect() throws IllegalActionException, QueryException { 173 if(_queryable != null) { 174 _queryable.disconnect(); 175 } 176 _connected = false; 177 } 178 179 public boolean usingSupportedRecordingType(){ 180 return _usingSupportedRecordingType; 181 } 182 183 /** 184 * Delete runs in provenance and from local list. Also if this 185 * WRM is connected to a remote store, delete runs from tracker file 186 * *if* they have been imported (not simply a preview run) 187 * then call updatePreviewRuns to put the preview run(s) back in the 188 * system. 189 * Calls fireWorkflowRunManagerEvent to notify listeners. 190 * @param recording 191 * @param lsids 192 * @param authenticate 193 * @return int numRowsDeleted 194 * @throws RecordingException 195 * @throws AuthenticationException 196 */ 197 public synchronized int deleteRuns(Recording recording, List<KeplerLSID> lsids, 198 boolean authenticate) throws RecordingException, AuthenticationException { 199 200 if (!(recording instanceof SQLRecordingV8)){ 201 throw new RecordingException("Error WorkflowRunManager deleteRuns, " + 202 "unexpected Recording type:"+recording.getClass().toString()); 203 } 204 205 //for (KeplerLSID lsid: lsids){ 206 // WorkflowRun run = getRun(lsid); 207 // run.resetType(WorkflowRun.type.Deleting.toString(), recording); 208 //} 209 210 int numRowsDeleted = ((SQLRecordingV8) recording).deleteExecutions(lsids); 211 int numRunsDeleted = deleteRunsLocally(lsids); 212 213 boolean needToUpdatePreviewRuns = false; 214 if (_provenanceStore.isRemoteKarStore()){ 215 RemoteStoreRunsTracker rsrt = _provenanceStore.getRemoteStoreRunsTracker(); 216 for (KeplerLSID lsid : lsids){ 217 if (rsrt.isKARImported(lsid.toString())){ 218 rsrt.deleteRunProperty(lsid.toString()); 219 needToUpdatePreviewRuns = true; 220 } 221 } 222 if (needToUpdatePreviewRuns){ 223 //now update to get the preview runs back. 224 updatePreviewRuns(authenticate); 225 } 226 } 227 228 // let everyone know, i.e. refresh all wrm tables 229 fireWorkflowRunManagerEvent(WorkflowRunManagerEvent.DELETION, lsids, true, null); 230 231 if (numRowsDeleted != numRunsDeleted){ 232 //System.out.println("WorkflowRunManager deleteRuns - WARNING numRowsDeleted:" 233 // +numRowsDeleted +" !="+" numRunsDeleted:"+numRunsDeleted); 234 } 235 return numRowsDeleted; 236 } 237 238 /** 239 * Delete runs from local list, CacheManager, and ObjectManager, but not 240 * from provenance. 241 * Do this e.g. when removing an older version of a run, e.g. during the 242 * change of a run from running to complete. 243 * 244 * @param execLSIDs 245 * @return numDeleted 246 */ 247 public synchronized int deleteRunsLocally(List<KeplerLSID> execLSIDs){ 248 249 Iterator<KeplerLSID> iter = execLSIDs.iterator(); 250 int numRunsDeleted = 0; 251 252 while(iter.hasNext()){ 253 try { 254 255 KeplerLSID lsid = iter.next(); 256 257 CacheManager cm = CacheManager.getInstance(); 258 if (cm.getObject(lsid) != null) { 259 cm.removeObject(lsid); 260 } 261 262 // NamedObj run = ObjectManager.getInstance().getObject(lsid); 263 // ObjectManager.getInstance().removeNamedObj(run); 264 ObjectManager.getInstance().removeObject(lsid); 265 266 WorkflowRun deletedRun = removeRun(lsid.toStringWithoutRevision()); 267 if (deletedRun != null){ 268 numRunsDeleted++; 269 } 270 } catch (CacheException e) { 271 // TODO Auto-generated catch block 272 e.printStackTrace(); 273 } catch (Exception e) { 274 // TODO Auto-generated catch block 275 e.printStackTrace(); 276 } 277 278 } 279 280 return numRunsDeleted; 281 // no need to send such an event currently: 282 ///fireWorkflowRunManagerEvent(WorkflowRunManagerEvent.LOCAL_DELETION, execLSIDs); 283 } 284 285 private synchronized WorkflowRun removeRun(String lsidWithoutRev){ 286 287 WorkflowRun run = null; 288 for (WorkflowRun tmpRun: _allWorkflowRuns){ 289 KeplerLSID runLSID = tmpRun.getExecLSID(); 290 String runLSIDWithoutRev = runLSID.toStringWithoutRevision(); 291 292 if (runLSIDWithoutRev.equals(lsidWithoutRev)){ 293 run = tmpRun; 294 break; 295 } 296 } 297 298 boolean deleted = _allWorkflowRuns.remove(run); 299 if (deleted){ 300 return run; 301 } 302 303 return null; 304 } 305 306 public synchronized WorkflowRun getRun(String lsidWithoutRev){ 307 308 for (WorkflowRun run: _allWorkflowRuns){ 309 KeplerLSID runLSID = run.getExecLSID(); 310 String runLSIDWithoutRev = runLSID.toStringWithoutRevision(); 311 if (runLSIDWithoutRev.equals(lsidWithoutRev)){ 312 return run; 313 } 314 } 315 return null; 316 } 317 318 public synchronized WorkflowRun getRun(KeplerLSID execLSID) { 319 320 for (WorkflowRun run : _allWorkflowRuns) { 321 KeplerLSID runLSID = run.getExecLSID(); 322 if (runLSID.equals(execLSID)) { 323 return run; 324 } 325 } 326 return null; 327 } 328 329 /** 330 * add runs to local list, and put in ObjectManager 331 */ 332 public synchronized void addRunsLocally(List<WorkflowRun> runs){ 333 334 //List<KeplerLSID> lsids = new ArrayList<KeplerLSID>(); 335 for(WorkflowRun run: runs){ 336 337 _allWorkflowRuns.add(run); 338 //lsids.add(run.getExecLSID()); 339 //WorkflowRunUtil.putInObjectManager(run, _queryable); 340 WorkflowRunUtil.putInObjectManager(run); 341 } 342 343 // no need to send such an event currently: 344 //fireWorkflowRunManagerEvent(WorkflowRunManagerEvent.LOCAL_EXEC_ADD, lsids, true, null); 345 } 346 347 348 public synchronized ArrayList<WorkflowRun> getWorkflowRuns(){ 349 return _allWorkflowRuns; 350 } 351 352 353 public synchronized void setWorkflowRuns(ArrayList<WorkflowRun> runs){ 354 _allWorkflowRuns = runs; 355 } 356 357 358 /** 359 * 360 * @param workflowNameSearchString 361 * @param userNameSearchString 362 * @param startTimeSearchString 363 * @param durationSearchString 364 * @param execIdSearchString 365 * @param tagSearchString 366 * @return 367 * @throws AuthenticationException 368 */ 369 public synchronized ArrayList<WorkflowRun> queryForAndSetRuns( 370 String workflowNameSearchString, String userNameSearchString, 371 String startTimeSearchString, String durationSearchString, 372 String execIdSearchString, String tagSearchString, 373 boolean authenticate) throws AuthenticationException { 374 if (isDebugging) { 375 log.debug(workflowNameSearchString); 376 log.debug(userNameSearchString); 377 log.debug(startTimeSearchString); 378 log.debug(durationSearchString); 379 log.debug(execIdSearchString); 380 log.debug(tagSearchString); 381 } 382 //System.out.println("WorkflowRunManager queryForAndSetRuns"); 383 384 // if provenance store is an ecogrid repo, first download and import into 385 // the reflection store preview runs, as necessary. 386 if (_provenanceStore.isRemoteKarStore()){ 387 updatePreviewRuns(authenticate); 388 } 389 390 ArrayList<WorkflowRun> runs = wrmQuery(workflowNameSearchString, 391 userNameSearchString, startTimeSearchString, 392 durationSearchString, execIdSearchString, tagSearchString); 393 394 log.debug("wrmQuery returned " + runs.size() + " runs"); 395 396 // now reset in-memory runs: 397 setWorkflowRuns(runs); 398 399 return runs; 400 } 401 402 /** 403 * @throws AuthenticationException 404 * 405 */ 406 private void updatePreviewRuns(boolean authenticate) throws AuthenticationException{ 407 408 List<WorkflowRunBuilder> workflowRunBuilders = _provenanceStore.downloadAllWorkflowRuns(authenticate); 409 log.debug("downloadAllWorkflowRuns returned " + workflowRunBuilders.size() + " workflowRunBuilders"); 410 411 RemoteStoreRunsTracker rsrt = _provenanceStore.getRemoteStoreRunsTracker(); 412 413 List<KeplerLSID> currentRemoteRunLSIDs = new ArrayList<KeplerLSID>(); 414 415 List<TrackerHelper> trackerHelpers = new ArrayList<TrackerHelper>(); 416 417 ProvenanceRecorder pr = getProvenanceRecorder(); 418 Recording recording = pr.getRecording(); 419 420 for (WorkflowRunBuilder workflowRunBuilder: workflowRunBuilders){ 421 422 WorkflowRun workflowRun = workflowRunBuilder.createWorkflowRun(); 423 if (workflowRun == null){ 424 continue; 425 } 426 currentRemoteRunLSIDs.add(workflowRun.getExecLSID()); 427 428 // KAR attributes necessary for KAR download. 429 KeplerLSID karLSID = workflowRunBuilder.getKARLSID(); 430 Long karFileSize = workflowRunBuilder.getKARFileSize(); 431 String karFileName = workflowRunBuilder.getKARFileName(); 432 if (karLSID == null || karFileSize == null || 433 karFileName == null){ 434 log.debug("workflowRunBuilder does not"+ 435 " contain KAR attributes necessary to download KAR, skipping run:"+workflowRun.getExecLSID()); 436 continue; 437 } 438 439 KeplerLSID originalRunLSIDInTracker = rsrt.getOriginalRunLSID(workflowRun.getExecLSID().toString()); 440 // if run not in tracker file, attempt to import it as a preview run 441 if (originalRunLSIDInTracker == null){ 442 443 log.debug("didn't find run with LSID "+ 444 workflowRun.getExecLSID() + " in tracker file, will attempt to import preview run"); 445 446 try { 447 KeplerLSID originalLSID = workflowRun.getExecLSID(); 448 boolean result = ((SQLRecordingV8) recording).insertPreviewRun(workflowRun); 449 450 if (result) { 451 TrackerHelper tih = new TrackerHelper(originalLSID, karLSID, karFileSize, 452 karFileName, workflowRun); 453 trackerHelpers.add(tih); 454 } 455 } catch (RecordingException e) { 456 // TODO Auto-generated catch block 457 e.printStackTrace(); 458 } 459 } 460 else{ 461 log.debug("no need to attempt to insertPreviewRun for " + 462 originalRunLSIDInTracker + " found it in the tracker file."); 463 } 464 } 465 466 log.debug(trackerHelpers.size() + " trackerHelpers"); 467 468 // now do majority of edits to RemoteStoreRunsTracker config, only 469 // write to disk on last iteration to improve speed. 470 for (int i=0; i<trackerHelpers.size(); i++){ 471 TrackerHelper tih = trackerHelpers.get(i); 472 WorkflowRun workflowRun = tih.getWorkflowRun(); 473 474 boolean serialize = false; 475 if (i == trackerHelpers.size()-1){ 476 serialize = true; 477 } 478 479 // update tracker file 480 boolean origRunLSIDset = rsrt.setOriginalRunLSID(tih.getOriginalLsid(), tih.getOriginalLsid(), serialize); 481 if (origRunLSIDset){ 482 rsrt.setKARLSID(tih.getOriginalLsid().toString(), tih.getKarLsid(), serialize); 483 rsrt.setKARFileSize(tih.getOriginalLsid().toString(), tih.getKarFileSize(), serialize); 484 rsrt.setKARFileName(tih.getOriginalLsid().toString(), tih.getKarFileName(), serialize); 485 rsrt.setExecId(tih.getOriginalLsid(), workflowRun.getExecId(), serialize); 486 } 487 488 // set run type to 'preview'. This changes run LSID. 489 workflowRun.resetType(WorkflowRun.type.Preview.toString(), recording); 490 // update tracker file 491 if (origRunLSIDset){ 492 rsrt.setPreviewRunLSID(tih.getOriginalLsid(), workflowRun.getExecLSID(), serialize); 493 } 494 495 // announce, though currently nothing much happens, wrm tablemodel 496 // runs haven't been set yet, so table can't show anything new 497 pr.executionImported(workflowRun.getExecLSID()); 498 } 499 500 501 // now remove any extraneous runs from tracker file 502 List<KeplerLSID> originalRunLSIDs = rsrt.getOriginalRunLSIDs(); 503 for (KeplerLSID lsid: originalRunLSIDs){ 504 if (!currentRemoteRunLSIDs.contains(lsid)){ 505 // only remove if it's still a preview run 506 // (i.e. user hasn't downloaded it) 507 if (!rsrt.isKARImported(lsid.toString())){ 508 KeplerLSID previewRunLSID = rsrt.getPreviewRunLSID(lsid.toString()); 509 boolean success = deletePreviewRun(previewRunLSID, authenticate); 510 System.out.println("WorkflowRunManager updatePreviewRuns " + 511 "remove extraneous preview run:"+previewRunLSID + " success?:"+success); 512 if (success){ 513 // update tracker file 514 success = rsrt.deleteRunProperty(lsid.toString()); 515 } 516 } 517 } 518 } 519 520 } 521 522 /** 523 * fire a programatic filter event for the benefit of any listeners. 524 * 525 * @param columnsAndFilters 526 */ 527 public void programticFilter(Map<String, String>columnsAndFilters){ 528 List<KeplerLSID> lsids = new ArrayList<KeplerLSID>(); 529 fireWorkflowRunManagerEvent(WorkflowRunManagerEvent.PROGRAMATIC_FILTER, lsids, true, columnsAndFilters); 530 } 531 532 /** 533 * Return ArrayList of WorkflowRuns that match given filters. 534 * 535 * @param workflowNameSearchString 536 * @param userNameSearchString 537 * @param startTimeSearchString 538 * @param durationSearchString 539 * @param execIdSearchString 540 * @param tagSearchString 541 * @return 542 */ 543 public ArrayList<WorkflowRun> filterRuns(String workflowNameSearchString, 544 String userNameSearchString, String startTimeSearchString, 545 String durationSearchString, String execIdSearchString, 546 String tagSearchString){ 547 548 //System.out.println("WorkflowRunManager filterRuns will check thru "+_allWorkflowRuns.size()+ " runs"); 549 550 ArrayList<WorkflowRun> filteredRuns = new ArrayList<WorkflowRun>(); 551 StartTimeQueryParse startTimeQueryParse = new StartTimeQueryParse( 552 startTimeSearchString); 553 DurationQueryParse durationQueryParse = new DurationQueryParse( 554 durationSearchString); 555 ExecIdQueryParse execIdQueryParse = new ExecIdQueryParse( 556 execIdSearchString); 557 558 559 for (WorkflowRun run: _allWorkflowRuns) { 560 try { 561 562 //filter on workflowNameSearchString 563 if (!workflowNameSearchString.equals(PERCENTAGE)){ 564 if (!run.getWorkflowName().matches("(?i).*"+workflowNameSearchString+".*")){ 565 continue; 566 } 567 } 568 569 //filter on userNameSearchString 570 if (!userNameSearchString.equals(PERCENTAGE)){ 571 if(!run.getUser().matches("(?i).*"+userNameSearchString+".*")){ 572 continue; 573 } 574 } 575 576 //filter on startTimeSearchString 577 if (!startTimeSearchString.equals(PERCENTAGE)){ 578 if (!startTimeQueryParse.passesFilter(run.getStartTime())){ 579 continue; 580 } 581 } 582 583 //filter on durationSearchString 584 if (!durationSearchString.equals(PERCENTAGE)){ 585 //TODO: if we ever upgrade to allow decimals, change: 586 if (!durationQueryParse.passesFilter(run.getDuration().intValue())){ 587 continue; 588 } 589 } 590 591 //filter on execIdSearchString 592 if (!execIdSearchString.equals(PERCENTAGE)){ 593 if (!execIdQueryParse.passesFilter(run.getExecId())){ 594 continue; 595 } 596 } 597 598 boolean passedFilter = true; 599 //filter on tagSearchString 600 if (!tagSearchString.equals(PERCENTAGE)){ 601 Map<NamedOntClass, String> tags = run.getTags(); 602 String regex = "\\s*,\\s*"; 603 String[] stringFragments = tagSearchString.split(regex); 604 for (String fragment : stringFragments) { 605 fragment = fragment.toLowerCase(); 606 boolean matchFound = false; 607 for (NamedOntClass tag : tags.keySet()) { 608 String tagName = tag.getName().toLowerCase(); 609 if (tagName.contains(fragment)) { 610 matchFound = true; 611 break; 612 } 613 } 614 if (!matchFound) { 615 passedFilter = false; 616 break; 617 } 618 } 619 } 620 621 if (!passedFilter) { 622 continue; 623 } 624 625 //run passed all filters, add it 626 filteredRuns.add(run); 627 628 } catch (Exception e) { // TODO Auto-generated catch block 629 e.printStackTrace(); } 630 } 631 632 //System.out.println("WorkflowRunManager filterRuns returning " + filteredRuns.size() + " runs"); 633 return filteredRuns; 634 } 635 636 /** 637 * Return the ComponenteEntity that was run to create the WorkflowRun that 638 * is identified by the given LSID. This method first checks to see if there 639 * is an existing object in the ObjectManager (which also checks the 640 * CacheManager if no object is found in the ObjectManager). If no object is 641 * found in the ObjectManager and no new object is created from the CacheManager 642 * then this method checks the provenance database for the workflow and 643 * parses a new NamedObj from the moml string stored there. 644 * 645 * @param runLSID 646 * @return 647 */ 648 public ComponentEntity getAssociatedWorkflowForWorkflowRun(KeplerLSID runLSID) { 649 650 ObjectManager om = ObjectManager.getInstance(); 651 try { 652 NamedObj workflow = null; 653 WorkflowRun run = (WorkflowRun)om.getObjectRevision(runLSID); 654 if (run != null){ 655 KeplerLSID workflowLSID = run.getWorkflowLSID(); 656 workflow = om.getObjectRevision(workflowLSID); 657 } 658 if (workflow == null) { 659 String moml = _queryable.getMoMLForExecution(runLSID); 660 MoMLParser parser = new MoMLParser(); 661 workflow = parser.parse(moml); 662 } 663 if (workflow == null) { 664 return null; 665 } else { 666 if (workflow instanceof ComponentEntity) { 667 om.addNamedObj(workflow); 668 return (ComponentEntity)workflow; 669 } else { 670 if (isDebugging) 671 log.debug("workflow was not a ComponentEntity"); 672 return null; 673 } 674 } 675 } catch (Exception e) { 676 log.debug("couldn't get workflow from provenance"); 677 return null; 678 } 679 } 680 681 /** 682 * Return runs from the list in memory that are associated with particular workflow. 683 * 684 * @param workflowLSID 685 * @return 686 */ 687 public Vector<WorkflowRun> getWorkflowRunsForWorkflow(KeplerLSID workflowLSID) { 688 Vector<WorkflowRun> runsAssociatedWithWorkflow = new Vector<WorkflowRun>(); 689 for(WorkflowRun wr : _allWorkflowRuns){ 690 KeplerLSID thisWorkflowLSID = wr.getWorkflowLSID(); 691 if (thisWorkflowLSID.equals(workflowLSID)) { 692 runsAssociatedWithWorkflow.add(wr); 693 } 694 } 695 return runsAssociatedWithWorkflow; 696 } 697 698 public Queryable getQueryable() { 699 return _queryable; 700 } 701 702 public WorkflowRunManagerRecording getWorkflowRunManagerRecording(){ 703 return _workflowRunManagerRecording; 704 } 705 706 /** 707 * Query provenance and create and return WorkflowRuns that 708 * match given query terms. 709 * 710 * Note: this is currently no longer utilized with user-entered search 711 * terms, we're just using it to get *all* runs now. 712 * 713 * @param workflowName 714 * @param userName 715 * @param startTimeSearchString 716 * @param durationSearchString 717 * @param execIdSearchString 718 * @param tagsSearchString 719 * @return 720 */ 721 public ArrayList<WorkflowRun> wrmQuery(String workflowName, 722 String userName, String startTimeSearchString, 723 String durationSearchString, String execIdSearchString, 724 String tagsSearchString) { 725 726 if (isDebugging) { 727 log.debug(workflowName); 728 log.debug(userName); 729 log.debug(startTimeSearchString); 730 log.debug(durationSearchString); 731 log.debug(execIdSearchString); 732 log.debug(tagsSearchString); 733 } 734 735 ArrayList<WorkflowRun> results = new ArrayList<WorkflowRun>(); 736 List<KeplerLSID> execLSIDs = new LinkedList<KeplerLSID>(); 737 738 StartTimeQueryParse startTimeQueryParse = new StartTimeQueryParse( 739 startTimeSearchString); 740 741 DurationQueryParse durationQueryParse = new DurationQueryParse( 742 durationSearchString); 743 744 try { 745 // TODO hsql 1.7 doesn't support timestampdiff or other timestamp 746 // arithmetic, afaict 747 // it's a new feature in 1.9 (in alpha currently). So I'm just 748 // filtering on Duration in java 749 // (instead of in an sql select) until 1.9 is released and we 750 // upgrade. 751 752 int[] execIds = getExecIds(execIdSearchString); 753 int execIdAfter = 0; 754 int execIdBefore = Integer.MAX_VALUE; 755 Timestamp epic = new Timestamp(0); 756 // get a timestamp one day in the future. 757 // NOTE: previously this was set to = new Timestamp(Long.MAX_VALUE), 758 // but postgres does not that a timestamp that large. 759 Timestamp distantFuture = new Timestamp(new Date().getTime() + 10*NUM_MILLISECONDS_IN_DAY); 760 761 if (execIds != null) { 762 execIdAfter = execIds[0]; 763 execIdBefore = execIds[1]; 764 } 765 766 // Get workflow runs data from provenance 767 // 768 if (startTimeSearchString.equals(PERCENTAGE)) { 769 Timestamp after = epic; 770 Timestamp before = distantFuture; 771 execLSIDs = _queryable.getExecutionLSIDsForWorkflowRuns( 772 workflowName, userName, after, before, execIdAfter, 773 execIdBefore); 774 } else if (startTimeQueryParse.isValid()) { 775 776 if (startTimeQueryParse.isRange()) { 777 // FIXME hardcodes: 778 Timestamp after = new Timestamp( 779 startTimeQueryParse.dates[0].getTime()); 780 Timestamp before = new Timestamp( 781 startTimeQueryParse.dates[1].getTime()); 782 execLSIDs = _queryable.getExecutionLSIDsForWorkflowRuns( 783 workflowName, userName, after, before, execIdAfter, 784 execIdBefore); 785 } else { 786 787 if ((startTimeQueryParse.getOperator()) != null) { 788 if ((startTimeQueryParse.getOperator()) 789 .equals(LESS_THAN)) { 790 // we want < but getExecutionLSIDsForWorkflowRuns 791 // is inclusive. so subtract one millisecond (hsqldb 792 // timestamps in milliseconds): 793 Timestamp before = new Timestamp( 794 startTimeQueryParse.dates[0].getTime() 795 - ONE_MILLISECOND); 796 Timestamp after = epic; 797 execLSIDs = _queryable 798 .getExecutionLSIDsForWorkflowRuns(workflowName, 799 userName, after, before, 800 execIdAfter, execIdBefore); 801 } else { 802 // we want > but getExecutionLSIDsForWorkflowRuns 803 // is inclusive. so add one millisecond (hsqldb 804 // timestamps in milliseconds): 805 Timestamp after = new Timestamp( 806 startTimeQueryParse.dates[0].getTime() 807 + ONE_MILLISECOND); 808 Timestamp before = distantFuture; 809 execLSIDs = _queryable 810 .getExecutionLSIDsForWorkflowRuns(workflowName, 811 userName, after, before, 812 execIdAfter, execIdBefore); 813 } 814 } else { 815 Timestamp after = new Timestamp( 816 startTimeQueryParse.dates[0].getTime() 817 + ONE_MILLISECOND); 818 Timestamp before = new Timestamp( 819 startTimeQueryParse.dates[0].getTime() 820 + (ONE_MILLISECOND * 2)); 821 if (startTimeQueryParse.justDateNoTime[0]) { 822 after = new Timestamp(startTimeQueryParse.dates[0] 823 .getTime() 824 + ONE_MILLISECOND); 825 before = new Timestamp(startTimeQueryParse.dates[0] 826 .getTime() 827 + NUM_MILLISECONDS_IN_DAY - ONE_MILLISECOND); 828 } 829 // TODO need a way to determine if just hours but no 830 // mins and secs specified, etc 831 execLSIDs = _queryable.getExecutionLSIDsForWorkflowRuns( 832 workflowName, userName, after, before, 833 execIdAfter, execIdBefore); 834 } 835 } 836 } 837 838 // if a tagSearchString is given, filter executions list. 839 if (!tagsSearchString.equals(PERCENTAGE)) { 840 841 List<Integer> executionIdsForTags = _queryable 842 .getExecutionIdsForTags(tagsSearchString); 843 844 List<KeplerLSID>executionLSIDsForTags = new ArrayList<KeplerLSID>(); 845 Iterator<Integer> itr = executionIdsForTags.iterator(); 846 while (itr.hasNext()){ 847 848 KeplerLSID execLSID = _queryable.getExecutionLSIDForExecution(itr.next()); 849 executionLSIDsForTags.add(execLSID); 850 } 851 852 execLSIDs.retainAll(executionLSIDsForTags); 853 } 854 855 // create Workflow Runs, filtered by Duration if necessary. 856 if (!execLSIDs.isEmpty()) { 857 Map<KeplerLSID, WorkflowRun> runsForExecutions = _queryable 858 .getWorkflowRunsForExecutionLSIDs(execLSIDs); 859 860 Iterator<WorkflowRun> runItr = runsForExecutions.values() 861 .iterator(); 862 while (runItr.hasNext()) { 863 864 WorkflowRun run = runItr.next(); 865 866 if (durationQueryParse.isValid()) { 867 if (durationQueryParse.passesFilter(run.getDuration().intValue())){ 868 results.add(run); 869 } 870 } else { 871 results.add(run); 872 } 873 874 } 875 } 876 } catch (QueryException e1) { 877 // TODO Auto-generated catch block 878 e1.printStackTrace(); 879 } catch (Exception e) { 880 // TODO Auto-generated catch block 881 e.printStackTrace(); 882 } 883 884 return results; 885 } 886 887 //TODO can this be deleted and use ExecIdQueryParse passesFilter ? 888 private int[] getExecIds(String execIdSearchString) { 889 ExecIdQueryParse execIdQueryParse = new ExecIdQueryParse( 890 execIdSearchString); 891 892 int[] results = new int[2]; 893 894 if (execIdSearchString.equals(PERCENTAGE)) { 895 return null; 896 } else if (execIdQueryParse.isValid()) { 897 898 if (execIdQueryParse.isRange()) { 899 results[0] = execIdQueryParse.parsedExecIds[0]; 900 results[1] = execIdQueryParse.parsedExecIds[1]; 901 return results; 902 } else { 903 if (!(execIdQueryParse.getOperator()).equals("")) { 904 if (execIdQueryParse.getOperator().equals(LESS_THAN)) { 905 results[0] = 0; // first possible execId is actually 1 906 // NOTE since we're using the same inclusive query for 907 // all queries, subtract 1: 908 results[1] = execIdQueryParse.parsedExecIds[0] - 1; 909 } else if (execIdQueryParse.getOperator().equals(GREATER_THAN)){ 910 // NOTE since we're using the same inclusive query for 911 // all queries, add 1: 912 results[0] = execIdQueryParse.parsedExecIds[0] + 1; 913 results[1] = Integer.MAX_VALUE; // end of int 914 } 915 return results; 916 } 917 // else { 918 // } 919 } 920 } 921 return null; 922 } 923 924 public void clearSelectedRuns(){ 925 _selectedRunsForSave.clear(); 926 } 927 928 public void addRunToSelectedRuns(KeplerLSID runLSID){ 929 _selectedRunsForSave.add(runLSID); 930 } 931 932 public Vector<KeplerLSID>getSelectedRunsForSave(){ 933 return _selectedRunsForSave; 934 } 935 936 public synchronized void addWorkflowRunManagerEventListener( 937 WorkflowRunManagerEventListener l) { 938 if (!_listeners.contains(l)) { 939 _listeners.add(new WeakReference<WorkflowRunManagerEventListener>(l)); 940 //System.out.println("WorkflowRunManager addWorkflowRunManagerEventListener("+l.getClass()+") added to _listeners list size:"+_listeners.size()); 941 } 942 } 943 944 public synchronized void removeWorkflowRunManagerEventListener( 945 WorkflowRunManagerEventListener l) { 946 for (int i = 0; i < _listeners.size(); i++) { 947 WeakReference<WorkflowRunManagerEventListener> reference = _listeners.get(i); 948 WorkflowRunManagerEventListener listener = reference.get(); 949 if(listener == null || listener == l) { 950 _listeners.remove(i); 951 i--; 952 } 953 } 954 } 955 956 //dispose when the frame is closed. 957 public synchronized void dispose( 958 WorkflowRunManagerEventListener l) { 959 removeWorkflowRunManagerEventListener(l); 960 } 961 962 /** 963 * Download run KAR using the LSID of the preview run stand-in. 964 * Replace preview run by removing it, and then importing run KAR. 965 * @param previewRunLSID 966 * @return 967 * @throws AuthenticationException 968 */ 969 public boolean downloadRunKAR(KeplerLSID previewRunLSID, boolean authenticate) throws AuthenticationException { 970 971 RemoteStoreRunsTracker rsrt = _provenanceStore.getRemoteStoreRunsTracker(); 972 973 KeplerLSID originalRunLSID = rsrt.getOriginalRunLSID(previewRunLSID.toString()); 974 String karFileName = rsrt.getKARFileName(previewRunLSID.toString()); 975 Long karFileSize = rsrt.getKARFileSize(previewRunLSID.toString()); 976 KeplerLSID karLSID = rsrt.getKARLSID(previewRunLSID.toString()); 977 978 log.debug("going to try to download KAR " + karFileName 979 + " with size:" + karFileSize + " lsid:" + karLSID); 980 Module wrmModule = ConfigurationManager.getModule(WRMDefaults.moduleName); 981 File wrmTransientDownloadDir = new File( 982 DotKeplerManager.getInstance(). 983 getTransientModuleDirectory(wrmModule.getStemName()).toString() + 984 File.separator + 985 this._provenanceStore.getEcogridRepository().getName()); 986 987 KARDownloader karDownloader = KARDownloader.getInstance(); 988 karDownloader.setKarName(karFileName); 989 karDownloader.setKarPath(wrmTransientDownloadDir.toString()); 990 String ecoGridRepositoryName = _provenanceStore.getEcogridRepository().getName(); 991 File downloadedKAR = karDownloader.download(karLSID.toString(), karFileSize, 992 ecoGridRepositoryName, authenticate); 993 994 if (downloadedKAR == null){ 995 // Actually just leave preview run, less confusing, 996 // and if download fails due to network problem, 997 // (instead of external delete) user may try again. 998 //if (deletePreviewRun(previewRunLSID)){ 999 // rsrt.deleteRunProperty(previewRunLSID); 1000 //} 1001 return false; 1002 } 1003 1004 if (deletePreviewRun(previewRunLSID, authenticate)){ 1005 //System.out.println("WorkflowRunManager downloadRunKAR successfully deleted preview run:"+previewRunLSID); 1006 ProvenanceRecorder pr = getProvenanceRecorder(); 1007 Recording recording = pr.getRecording(); 1008 //TODO check if this piggyback stuff is necessary 1009 // add any piggybacks that may have been added after this 1010 // ProvenanceRecorder was originally created 1011 List<Recording> futuristicPiggyBacks = ProvenanceRecorder 1012 .getFuturisticPiggybacks(recording); 1013 Iterator<Recording> i = futuristicPiggyBacks.iterator(); 1014 while (i.hasNext()) { 1015 pr.addPiggyback(i.next()); 1016 } 1017 1018 try{ 1019 KARFile karFile = new KARFile(downloadedKAR); 1020 List<KAREntry> karEntries = karFile.karEntries(); 1021 KAREntry runEntry = null; 1022 1023 for (KAREntry karEntry: karEntries){ 1024 if (karEntry.getLSID().equals(originalRunLSID)){ 1025 //System.out.println("WorkflowRunManager downloadRunKAR found the run karEntry in the downloaded kar"); 1026 runEntry = karEntry; 1027 break; 1028 } 1029 } 1030 1031 if (runEntry != null){ 1032 InputStream wfRunStream = karFile.getInputStream(runEntry); 1033 String wfRunString = FileUtil.convertStreamToString(wfRunStream); 1034 MoMLParser parser = new MoMLParser(); 1035 NamedObj no = parser.parse(wfRunString); 1036 WorkflowRun run = (WorkflowRun) no; 1037 //System.out.println("WorkflowRunManager downloadRunKAR created run from karEntry. will try to insertHuskRun. has execLSID:"+run.getExecLSID()); 1038 1039 boolean result = ((SQLRecordingV8) recording).insertHuskRun(karFile, 1040 run); 1041 1042 if (result) { 1043 rsrt.setImportedKARRunLSID(originalRunLSID.toString(), run.getExecLSID(), true); 1044 pr.executionImported(run.getExecLSID()); 1045 rsrt.setExecId(originalRunLSID, run.getExecId(), true); 1046 } 1047 } 1048 1049 } catch (RecordingException e) { 1050 // TODO Auto-generated catch block 1051 e.printStackTrace(); 1052 } catch (IOException e) { 1053 // TODO Auto-generated catch block 1054 e.printStackTrace(); 1055 } catch (Exception e) { 1056 // TODO Auto-generated catch block 1057 e.printStackTrace(); 1058 } 1059 } 1060 1061 // delete downloaded KAR, no longer needed 1062 File karFilepath = new File(wrmTransientDownloadDir + File.separator + karFileName); 1063 log.debug("deleting temp KAR: " + karFilepath); 1064 karFilepath.delete(); 1065 1066 return true; 1067 } 1068 1069 /** 1070 * Delete run from provenance and memory, does NOT 1071 * update tracker file. 1072 * @param previewRunLSID 1073 * @return boolean 1074 * @throws AuthenticationException 1075 */ 1076 private boolean deletePreviewRun(KeplerLSID previewRunLSID, boolean authenticate) throws AuthenticationException { 1077 ProvenanceRecorder pr = getProvenanceRecorder(); 1078 Recording recording = pr.getRecording(); 1079 List<KeplerLSID>runsToDelete = new ArrayList<KeplerLSID>(); 1080 runsToDelete.add(previewRunLSID); 1081 try { 1082 int numDeleted = deleteRuns(recording, runsToDelete, authenticate); 1083 if (numDeleted == 1){ 1084 //System.out.println("WorkflowRunManager deletePreviewRun successfully deleted preview run:"+previewRunLSID); 1085 return true; 1086 } 1087 return false; 1088 } catch (RecordingException e) { 1089 // TODO Auto-generated catch block 1090 e.printStackTrace(); 1091 } 1092 return false; 1093 } 1094 1095 public synchronized void fireWorkflowRunManagerEvent(String eventName, KeplerLSID lsid, boolean isLastEvent) { 1096 List<KeplerLSID>lsids = new ArrayList<KeplerLSID>(); 1097 lsids.add(lsid); 1098 fireWorkflowRunManagerEvent(eventName, lsids, isLastEvent, null); 1099 } 1100 1101 public synchronized void fireWorkflowRunManagerEvent(String eventName, List<KeplerLSID> lsids, 1102 boolean isLastEvent, Object eventData) { 1103 WorkflowRunManagerEvent wfrmEvent = new WorkflowRunManagerEvent(this, eventName, lsids, 1104 isLastEvent, eventData); 1105 //System.out.println("WorkflowRunManager with provStore:"+_provenanceStore.getType()+ 1106 // " fireWorkflowRunManagerEvent going to iter thru "+_listeners.size()+" listeners"); 1107 for (int i = 0; i < _listeners.size(); i++) { 1108 WeakReference<WorkflowRunManagerEventListener> reference = _listeners.get(i); 1109 WorkflowRunManagerEventListener listener = reference.get(); 1110 if(listener != null) { 1111 //System.out.println("WorkflowRunManager with provStore:"+_provenanceStore.getType()+ 1112 // " fireWorkflowRunManagerEvent listener("+i+") != null, " + 1113 // "going to listener.workflowRunManagerEventOccurred(wfrmEvent). listener:"+listener.getClass()); 1114 listener.workflowRunManagerEventOccurred(wfrmEvent); 1115 }else if (listener == null ){ 1116 //System.out.println("WorkflowRunManager fireWorkflowRunManagerEvent listener("+i+") == null, removing listener from list"); 1117 _listeners.remove(i); 1118 i--; 1119 } 1120 1121 } 1122 } 1123 1124 1125 1126 public void workflowRunManagerEventOccurred(WorkflowRunManagerEvent evt) { 1127 1128 //System.out.println("WorkflowRunManager workflowRunManagerEventOccurred(evt:"+evt.getEventName()+")" + 1129 // " source: "+evt.getSource()); 1130 1131 if (_queryable == null){ 1132 System.out.println("WARN WorkflowRunManager workflowRunManagerEventOccurred(evt:"+evt.getEventName()+")" + 1133 " source: "+evt.getSource() + " but _queryable == null, doing nothing"); 1134 return; 1135 } 1136 1137 if (evt.getEventName().equals(WorkflowRunManagerEvent.EXECUTION_START)){ 1138 1139 try { 1140 List<KeplerLSID> execLSIDs = evt.getEventLSIDs(); 1141 Map<KeplerLSID, WorkflowRun> runsToAdd = _queryable.getWorkflowRunsForExecutionLSIDs(execLSIDs); 1142 List<WorkflowRun> runs = new ArrayList<WorkflowRun>(runsToAdd.values()); 1143 addRunsLocally(runs); 1144 } catch (QueryException e) { 1145 // TODO Auto-generated catch block 1146 e.printStackTrace(); 1147 } 1148 } else if (evt.getEventName().equals(WorkflowRunManagerEvent.EXECUTION_STOP) || 1149 evt.getEventName().equals(WorkflowRunManagerEvent.EXECUTION_ERROR) || 1150 evt.getEventName().equals(WorkflowRunManagerEvent.TAG_ADDED) || 1151 evt.getEventName().equals(WorkflowRunManagerEvent.TAG_REMOVED) || 1152 evt.getEventName().equals(WorkflowRunManagerEvent.EXECUTION_IMPORTED)){ 1153 1154 try { 1155 List<KeplerLSID> execLSIDs = evt.getEventLSIDs(); 1156 1157 1158 /* an alternate approach would be to not reconstruct runs here, 1159 * instead just add or delete the relevant tags on existing runs 1160 List<WorkflowRun> runs = new ArrayList<WorkflowRun>(); 1161 Iterator<KeplerLSID> itr = execLSIDs.iterator(); 1162 while (itr.hasNext()){ 1163 KeplerLSID execLSID = itr.next(); 1164 int execId = queryable.getExecutionForExecutionLSID(execLSID); 1165 Map<NamedOntClass, String>tags = queryable.getTagClassesForExecutionId(execId); 1166 WorkflowRun run = allWorkflowRuns.get(execLSID.toStringWithoutRevision()); 1167 //run.setTags(tags); //need to add add and delete methods, don't trample existing 1168 runs.add(run); 1169 } 1170 if (execLSIDs.size() == runs.size()){ 1171 deleteRunsLocally(execLSIDs); 1172 addRunsLocally(runs); 1173 } 1174 else{ 1175 System.out.println("ERROR execLSIDs.size() != runs.size() this may have caused problems!!!!!!!!!!!!"); 1176 } 1177 */ 1178 1179 Map<KeplerLSID, WorkflowRun> runsToAdd = _queryable.getWorkflowRunsForExecutionLSIDs(execLSIDs); 1180 List<WorkflowRun> runs = new ArrayList<WorkflowRun>(runsToAdd.values()); 1181 1182 // protects against bug#4919, where 2 TAG_REMOVED events occur per 1 tag deletion 1183 // if this is not here, the 2nd event deletes the run, but adds no run back because of the 1184 // LSID revision bump. 1185 //System.out.println("WokflowRunManager workflowRunManagerEventOccurred checking " + 1186 // "if execLSIDs.size() (execLSIDs from getEventLSIDs):"+execLSIDs.size()+" equals runs.size() (fetched from getWorkflowRunsForExecutionLSIDs)"+runs.size()); 1187 if (execLSIDs.size() == runs.size()){ 1188 //first delete any old versions (e.g. added during executionStart event) 1189 deleteRunsLocally(execLSIDs); 1190 addRunsLocally(runs); 1191 } 1192 1193 // if this is a tag event on an imported run in a reflection store, 1194 // need to update importedKARRunLSID in tracker file 1195 if (_provenanceStore.isRemoteKarStore() && 1196 (evt.getEventName().equals(WorkflowRunManagerEvent.TAG_ADDED) || 1197 evt.getEventName().equals(WorkflowRunManagerEvent.TAG_REMOVED)) ){ 1198 for (WorkflowRun run: runs){ 1199 RemoteStoreRunsTracker rsrt = _provenanceStore.getRemoteStoreRunsTracker(); 1200 rsrt.setImportedKARRunLSID(run.getExecId().toString(), run.getExecLSID(), true); 1201 } 1202 } 1203 1204 } catch (QueryException e) { 1205 // TODO Auto-generated catch block 1206 e.printStackTrace(); 1207 } 1208 } else if (evt.getEventName().equals(WorkflowRunManagerEvent.WORKFLOW_RENAMED)){ 1209 1210 try { 1211 List<KeplerLSID> oldLSIDsForWorkflows = evt.getEventLSIDs(); 1212 List<KeplerLSID> runsOfWorkflows = new ArrayList<KeplerLSID>(); 1213 1214 Iterator<WorkflowRun>itr = _allWorkflowRuns.iterator(); 1215 WorkflowRun run; 1216 while (itr.hasNext()) { 1217 run = itr.next(); 1218 if (oldLSIDsForWorkflows.contains(run.getWorkflowLSID())){ 1219 runsOfWorkflows.add(run.getExecLSID()); 1220 } 1221 } 1222 1223 if (runsOfWorkflows.size() < 1){ 1224 return; 1225 } 1226 1227 deleteRunsLocally(runsOfWorkflows); 1228 Map<KeplerLSID, WorkflowRun> runsToAdd = _queryable.getWorkflowRunsForExecutionLSIDs(runsOfWorkflows); 1229 List<WorkflowRun> runs = new ArrayList<WorkflowRun>(runsToAdd.values()); 1230 addRunsLocally(runs); 1231 1232 } catch (QueryException e) { 1233 // TODO Auto-generated catch block 1234 e.printStackTrace(); 1235 } 1236 1237 } 1238 1239 } 1240 1241 public ProvenanceStore getProvenanceStore(){ 1242 return _provenanceStore; 1243 } 1244 1245 public ProvenanceRecorder getProvenanceRecorder(){ 1246 if (_provenanceRecorder == null){ 1247 System.out.println("WARNING WorkflowRunManager getProvenanceRecorder returning null"); 1248 } 1249 return _provenanceRecorder; 1250 } 1251 1252 //public Recording getRecording(){ 1253 // if (_recording == null){ 1254 // System.out.println("WARNING WorkflowRunManager getRecording returning null"); 1255 // } 1256 // return _recording; 1257 //} 1258 1259 private static class TrackerHelper{ 1260 KeplerLSID originalLsid = null; 1261 KeplerLSID karLsid = null; 1262 Long karFileSize = null; 1263 String karFileName = null; 1264 WorkflowRun workflowRun = null; 1265 1266 public TrackerHelper(KeplerLSID originalLsid, KeplerLSID karLsid, 1267 Long karFileSize, String karFileName, WorkflowRun workflowRun) { 1268 this.originalLsid = originalLsid; 1269 this.karLsid = karLsid; 1270 this.karFileSize = karFileSize; 1271 this.karFileName = karFileName; 1272 this.workflowRun = workflowRun; 1273 } 1274 1275 public KeplerLSID getOriginalLsid(){ 1276 return originalLsid; 1277 } 1278 public KeplerLSID getKarLsid(){ 1279 return karLsid; 1280 } 1281 public Long getKarFileSize(){ 1282 return karFileSize; 1283 } 1284 public String getKarFileName(){ 1285 return karFileName; 1286 } 1287 public WorkflowRun getWorkflowRun(){ 1288 return workflowRun; 1289 } 1290 } 1291 1292} 1293 1294