001///////////////////////////////////////////////////////////////////////////// 002// Copyright (c) 2009 OPeNDAP, Inc. 003// All rights reserved. 004// Permission is hereby granted, without written agreement and without 005// license or royalty fees, to use, copy, modify, and distribute this 006// software and its documentation for any purpose, provided that the above 007// copyright notice and the following two paragraphs appear in all copies 008// of this software. 009// 010// IN NO EVENT SHALL OPeNDAP BE LIABLE TO ANY PARTY FOR DIRECT, INDIRECT, 011// SPECIAL, INCIDENTAL, OR CONSEQUENTIAL DAMAGES ARISING OUT OF THE USE OF 012// THIS SOFTWARE AND ITS DOCUMENTATION, EVEN IF OPeNDAP HAS BEEN ADVISED 013// OF THE POSSIBILITY OF SUCH DAMAGE. 014// 015// OPeNDAP SPECIFICALLY DISCLAIMS ANY WARRANTIES, INCLUDING, BUT NOT 016// LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A 017// PARTICULAR PURPOSE. THE SOFTWARE PROVIDED HEREUNDER IS ON AN "AS IS" 018// BASIS, AND OPeNDAP HAS NO OBLIGATION TO PROVIDE MAINTENANCE, SUPPORT, 019// UPDATES, ENHANCEMENTS, OR MODIFICATIONS. 020// 021// Author: Nathan David Potter <ndp@opendap.org> 022// You can contact OPeNDAP, Inc. at PO Box 112, Saunderstown, RI. 02874-0112. 023// 024///////////////////////////////////////////////////////////////////////////// 025 026package org.kepler.dataproxy.datasource.opendap; 027 028import java.util.Collection; 029import java.util.Enumeration; 030import java.util.Iterator; 031import java.util.Vector; 032 033import org.apache.commons.logging.Log; 034import org.apache.commons.logging.LogFactory; 035 036import opendap.clients.odc.ApplicationController; 037import opendap.clients.odc.DodsURL; 038import opendap.dap.DArray; 039import opendap.dap.DConnect2; 040import opendap.dap.DConstructor; 041import opendap.dap.DDS; 042import ptolemy.actor.TypedIOPort; 043import ptolemy.actor.lib.Source; 044import ptolemy.data.BooleanToken; 045import ptolemy.data.StringToken; 046import ptolemy.data.expr.FileParameter; 047import ptolemy.data.expr.Parameter; 048import ptolemy.data.type.Type; 049import ptolemy.kernel.CompositeEntity; 050import ptolemy.kernel.util.IllegalActionException; 051import ptolemy.kernel.util.NameDuplicationException; 052import ptolemy.moml.MoMLChangeRequest; 053 054/** 055 * The OPeNDAP actor reads data from OPeNDAP data sources (i.e. servers). 056 * 057 * <h1>OPeNDAP Actor Overview</h1> 058 * 059 * The OPeNDAP actor provides access to data served by any Data Access Protocol 060 * (DAP) 2.0 compatible data source. The actor takes as configuration parameters 061 * the URL to the data source and an optional constraint expression (CE). Based 062 * on the URL and optional CE, the actor configures its output ports to match 063 * the variables to be read from the data source. 064 * 065 * <h2>More information about the OPeNDAP actor</h2> 066 * 067 * The OPeNDAP actor reads data from a single DAP data server and provides that 068 * data as either a vector.matrix or array for processing by downstream elements 069 * in a Kepler workflow. Each DAP server provides (serves) many data sources and 070 * each of those data sources can be uniquely identified using a URL in a way 071 * that's similar to how pages are provided by a web server. For more 072 * information on the DAP and on OPeNDAP's software, see www.opendap.org. 073 * 074 * <h3>Characterization of Data Sources</h3> 075 * 076 * Data sources accessible using DAP 2.0 are characterized by a URL that 077 * references a both a specific data server and a data granule available from 078 * that server and a Constraint Expression that describes which variables to 079 * read from within the data granule. In addition to reading data from a 080 * granule, a DAP 2.0 server can provide two pieces of information about the 081 * granule: a description of all of its variables, their names and their data 082 * types; and a collection of 'attributes' which are bound to those variables. 083 * 084 * <h3>Operation of the Actor</h3> 085 * 086 * The actor must have a valid URL before it can provide any information (just 087 * as a file reader actor need to point toward a file to provide data). Given a 088 * URL and the optional CE, the OPeNDAP actor will interrogate that data source 089 * and configure its output ports. 090 * 091 * <h3>Data Types Returned by the Actor</h3> 092 * 093 * There are two broad classes of data types returned by the actor. First there 094 * are vectors, matrices and arrays. These correspond to one, two and N (> 2) 095 * dimensional arrays. The distinction between the vector and matrix types and 096 * the N-dimensional array is that Kepler can operate on the vector and matrix 097 * types far more efficiently than the N-dimensional arrays. Many variables 098 * present in DAP data sources are of the N-dimensional array class and one way 099 * to work with these efficiently is to use the constraint expression to reduce 100 * the order of these data to one or two, thus causing the actor to store them 101 * in a vector or matrix. 102 * 103 * <p> 104 * As an example, consider the FNOC1 data source available at test.opendap.org. 105 * The full URL for this is http://test.opendap.org/opendap/data/nc/fnoc1.nc. It 106 * contains a variable 'u' which has three dimensions. We can constrain 'u' so 107 * that it has only two dimensions when read into Kepler using the CE 108 * 'u[0][0:16][0:20]' which selects only the first element (index 0) for the 109 * first dimension while requesting all of the remaining elements for the second 110 * and third dimensions. The www.opendap.org has documentation about the CE 111 * syntax. 112 * </p> 113 * 114 * <p> 115 * The second data type returned by the actor is a record. In reality, all DAP 116 * data sources are records but the actor automatically 'disassembles' the top 117 * most record since we know that's what the vast majority of users will want. 118 * However, some data sources contains nested hierarchies of records many levels 119 * deep. When dealing with those data sources you will need to use the Kepler 120 * record disassembler in your work flow. 121 * </p> 122 * 123 * @author Nathan Potter 124 * @version $Id: OpendapDataSourceODC.java 33629 2015-08-24 22:43:10Z crawl $ 125 * @since Kepler 1.0RC1 126 * @date Jul 17, 2007 127 */ 128public class OpendapDataSourceODC extends Source { 129 130 static Log log; 131 static { 132 log = LogFactory 133 .getLog("org.kepler.dataproxy.datasource.opendap.OpendapDataSource"); 134 } 135 136 private static final String OPENDAP_CONFIG_DIR = "/configs/ptolemy/configs/kepler/opendap"; 137 138 /** 139 * The OPeNDAP URL that identifies a (possibly constrained) dataset. 140 */ 141 public FileParameter opendapURLParameter = null; 142 143 /** 144 * The OPeNDAP Constraint Expression used to sub sample the dataset. 145 */ 146 public FileParameter opendapCEParameter = null; 147 148 // *** Remove. jhrg 149 public Parameter runODC = null; 150 151 private String opendapURL; 152 private String opendapCE; 153 private DConnect2 dapConnection; 154 155 public OpendapDataSourceODC(CompositeEntity container, String name) 156 throws NameDuplicationException, IllegalActionException { 157 super(container, name); 158 159 opendapURLParameter = new FileParameter(this, "opendapURLParameter"); 160 opendapCEParameter = new FileParameter(this, "opendapCEParameter"); 161 162 opendapURL = ""; 163 opendapCE = ""; 164 dapConnection = null; 165 166 runODC = new Parameter(this, "runODC"); 167 runODC.setTypeEquals(ptolemy.data.type.BaseType.BOOLEAN); 168 runODC.setExpression("false"); 169 } 170 171 /** 172 * 173 * @param attribute 174 * The changed Attribute. 175 * @throws ptolemy.kernel.util.IllegalActionException 176 * When bad things happen. 177 */ 178 public void attributeChanged(ptolemy.kernel.util.Attribute attribute) 179 throws ptolemy.kernel.util.IllegalActionException { 180 181 log.debug("attributeChanged() start."); 182 183 if (attribute == opendapURLParameter || attribute == opendapCEParameter) { 184 185 String url = opendapURLParameter.getExpression(); 186 String ce = opendapCEParameter.getExpression(); 187 188 if (attribute == opendapURLParameter) 189 log.debug("--- attributeChanged() url: " + url 190 + " Current URL: " + opendapURL); 191 if (attribute == opendapCEParameter) 192 log.debug("--- attributeChanged() ce: \"" + ce 193 + "\" Current CE: \"" + opendapCE + "\""); 194 195 boolean reload = false; 196 if (!opendapURL.equals(url)) { 197 opendapURL = url; 198 199 // only reload if not empty. 200 if (!url.equals("")) { 201 reload = true; 202 } 203 } 204 205 if (!opendapCE.equals(ce)) { 206 opendapCE = ce; 207 // *** I think this should test if url.equals(""). jhrg 208 reload = true; 209 } 210 211 if (reload) { 212 213 try { 214 215 log.debug("OPeNDAP URL: " + opendapURL); 216 dapConnection = new DConnect2(opendapURL); 217 218 DDS dds = dapConnection.getDDS(opendapCE); 219 220 log.debug("Got DDS."); 221 // dds.print(System.out); 222 223 log.debug("Squeezing arrays."); 224 squeezeArrays(dds); 225 226 // log.debug("Before ports configured."); 227 // dds.print(System.out); 228 229 log.debug("Configuring ports."); 230 configureOutputPorts(dds); 231 232 // log.debug("After ports configured."); 233 // dds.print(System.out); 234 235 } catch (Exception e) { 236 e.printStackTrace(); 237 throw new IllegalActionException("Problem accessing " 238 + "OPeNDAP Data Source: " + e.getMessage()); 239 } 240 241 } 242 243 } 244 // *** Remove the ODC option. jhrg 245 else if (attribute == runODC) { 246 BooleanToken token = (BooleanToken) runODC.getToken(); 247 if (token.booleanValue()) { 248 // start ODC in separate thread 249 ODCThread tr = new ODCThread(); 250 tr.start(); 251 runODC.setExpression("false"); 252 } 253 } 254 } 255 256 // a thread to run ODC 257 // *** Remove. jhrg 258 private class ODCThread extends Thread { 259 ODCThread() { 260 super(); 261 } 262 263 public void run() { 264 265 String keplerprop = System.getProperty("KEPLER"); 266 if (keplerprop == null) { 267 keplerprop = "."; 268 } 269 270 String configDir = keplerprop + OPENDAP_CONFIG_DIR; 271 272 DodsURL[] urls = ApplicationController 273 .blockingMain(new String[] { configDir }); 274 275 if (urls != null) { 276 if (urls.length > 1) { 277 // XXX how is this case possible? 278 log.warn("More than one URL returned from ODC: " 279 + urls.length); 280 } 281 282 String odcUrl = urls[0].getFullURL(); 283 284 String urlStr = null; 285 String ceStr = null; 286 int index = odcUrl.indexOf("?"); 287 if (index == -1) { 288 urlStr = odcUrl; 289 } else { 290 urlStr = odcUrl.substring(0, index); 291 ceStr = odcUrl.substring(index + 1); 292 } 293 294 log.debug("ODC sent url = " + urlStr); 295 296 try { 297 // first clear the old URL and CE 298 // NOTE: setting a new URL before clearing old CE 299 // will cause a reload, which may not work since 300 // the CE could refer to fields not present in the 301 // new URL. 302 opendapURLParameter.setToken(new StringToken("")); 303 opendapCEParameter.setToken(new StringToken("")); 304 305 opendapURLParameter.setToken(new StringToken(urlStr)); 306 if (ceStr != null) { 307 opendapCEParameter.setToken(new StringToken(ceStr)); 308 log.debug("ODC sent ce = " + ceStr); 309 } 310 } catch (IllegalActionException e) { 311 log.error(e); 312 } 313 314 // queue a dummy change request that will cause 315 // the gui to show the new output ports. 316 String buffer = "<group>\n</group>"; 317 MoMLChangeRequest request = new MoMLChangeRequest(this, 318 getContainer(), buffer); 319 request.setPersistent(false); 320 requestChange(request); 321 } 322 } 323 } 324 325 public void preinitialize() throws IllegalActionException { 326 327 super.preinitialize(); 328 log.debug("--- preintitialize"); 329 330 } 331 332 public void initialize() throws IllegalActionException { 333 334 super.initialize(); 335 log.debug("--- intitialize"); 336 337 } 338 339 public boolean prefire() throws IllegalActionException { 340 super.prefire(); 341 342 log.debug("--- prefire"); 343 344 try { 345 346 if (dapConnection == null) { 347 log.debug("OPeNDAP URL: " + opendapURL); 348 dapConnection = new DConnect2(opendapURL); 349 } 350 351 } catch (Exception e) { 352 log.error("prefire Failed: ", e); 353 } 354 355 return true; 356 } 357 358 public void fire() throws IllegalActionException { 359 super.fire(); 360 log.debug("\n\n\n--- fire"); 361 362 try { 363 String ce = opendapCEParameter.getExpression(); 364 log.debug("Constraint Expression: " + ce); 365 366 ce = createCEfromWiredPorts(ce); 367 368 log.debug("Using CE: " + ce); 369 370 DDS dds = dapConnection.getData(ce); 371 // log.debug("fire(): dapConnection.getData(ce) returned DataDDS:"); 372 // dds.print(System.out); 373 374 log.debug("Squeezing arrays."); 375 squeezeArrays(dds); 376 377 log.debug("Broadcasting DAP data arrays."); 378 broadcastDapData(dds); 379 380 // log.debug("fire(): After data broadcast:"); 381 // dds.print(System.out); 382 383 } catch (Exception e) { 384 log.error("fire() Failed: ", e); 385 386 } 387 } 388 389 public boolean postfire() throws IllegalActionException { 390 391 super.postfire(); 392 log.debug("--- postfire"); 393 394 return false; 395 396 } 397 398 /** 399 * Build up the projection part of the constraint expression (CE) in order 400 * to minimize the amount of data retrieved. If the CE is empty, then this 401 * will build a list of projected variables base on which output ports are 402 * wired. If the CE is not empty then it will not be modified. 403 * 404 * @param ce 405 * The current CE 406 * @return A new CE if the passed one is not empty, a new one corresponding 407 * to the wired output ports otherwise. 408 * @exception IllegalActionException 409 * If the width of the wired ports cannot be calculated. 410 */ 411 private String createCEfromWiredPorts(String ce) 412 throws IllegalActionException { 413 414 if (ce.equals("")) { 415 416 // Get the port list 417 Iterator i = this.outputPortList().iterator(); 418 419 String projection = ""; 420 int pcount = 0; 421 while (i.hasNext()) { 422 TypedIOPort port = (TypedIOPort) i.next(); 423 if (port.getWidth() > 0) { 424 log.debug("Added " + port.getName() + " to projection."); 425 if (pcount > 0) 426 projection += ","; 427 projection += port.getName(); 428 pcount++; 429 } 430 } 431 ce = projection; 432 } 433 434 return ce; 435 436 } 437 438 /** 439 * Walks through the DDS, converts DAP data to ptII data, and broadcasts the 440 * data onto the appropriate ports. 441 * 442 * @param dds 443 * The DDS from which to get the data to send 444 * @throws IllegalActionException 445 * When bad things happen. 446 */ 447 private void broadcastDapData(DDS dds) throws IllegalActionException { 448 449 // log.debug("broadcastDapData(): DataDDS prior to broadcast:"); 450 // dds.print(System.out); 451 452 Enumeration e = dds.getVariables(); 453 while (e.hasMoreElements()) { 454 opendap.dap.BaseType bt = (opendap.dap.BaseType) e.nextElement(); 455 456 String columnName = bt.getLongName().trim(); 457 // Get the port associated with this DDS variable. 458 TypedIOPort port = (TypedIOPort) this.getPort(columnName); 459 if (port == null) { 460 throw new IllegalActionException( 461 "Request Output Port Missing: " + columnName); 462 } 463 464 log.debug("Translating data."); 465 // bt.printDecl(System.out); 466 467 // Map the DAP data for this variable into the ptII Token model. 468 ptolemy.data.Token token = TokenMapper.mapDapObjectToToken(bt, 469 false); 470 log.debug("Data Translated :"); 471 // bt.printDecl(System.out); 472 473 // Send the data. 474 log.debug("Sending data."); 475 port.broadcast(token); 476 log.debug("Sent data."); 477 478 } 479 480 } 481 482 /** 483 * Probe a port 484 * 485 * @param port 486 * The port to probe. 487 * @return The probe report. 488 */ 489 public static String portInfo(TypedIOPort port) { 490 491 String width = ""; 492 try { 493 width = Integer.valueOf(port.getWidth()).toString(); 494 } catch (IllegalActionException ex) { 495 width = "Failed to get width of port " + port.getFullName() + ex; 496 } 497 498 String description = ""; 499 try { 500 description = port.description(); 501 } catch (IllegalActionException ex) { 502 description = "Failed to get the description of port " 503 + port.getFullName() + ": " + ex; 504 } 505 String msg = "Port Info: \n"; 506 507 msg += " getName(): " + port.getName() + "\n"; 508 msg += " getWidth(): " + width + "\n"; 509 msg += " isInput(): " + port.isInput() + "\n"; 510 msg += " isOutput(): " + port.isOutput() + "\n"; 511 msg += " isMultiport(): " + port.isMultiport() + "\n"; 512 msg += " className(): " + port.getClassName() + "\n"; 513 msg += " getDisplayName(): " + port.getDisplayName() + "\n"; 514 msg += " getElementName(): " + port.getElementName() + "\n"; 515 msg += " getFullName(): " + port.getFullName() + "\n"; 516 msg += " getSource(): " + port.getSource() + "\n"; 517 msg += " description(): " + description + "\n"; 518 msg += " toString(): " + port + "\n"; 519 520 return msg; 521 522 } 523 524 /** 525 * Configure the output ports to expose all of the variables at the top 526 * level of the (potentially constrained) DDS. 527 * 528 * @param dds 529 * The DDS 530 * @throws IllegalActionException 531 * When bad things happen. 532 */ 533 private void configureOutputPorts(DDS dds) throws IllegalActionException { 534 535 Vector<Type> types = new Vector<Type>(); 536 Vector<String> names = new Vector<String>(); 537 538 Enumeration e = dds.getVariables(); 539 while (e.hasMoreElements()) { 540 opendap.dap.BaseType bt = (opendap.dap.BaseType) e.nextElement(); 541 types.add(TypeMapper.mapDapObjectToType(bt, false)); 542 names.add(bt.getLongName()); 543 } 544 545 removeOtherOutputPorts(names); 546 547 Iterator ti = types.iterator(); 548 Iterator ni = names.iterator(); 549 550 while (ti.hasNext() && ni.hasNext()) { 551 Type type = (Type) ti.next(); 552 String name = (String) ni.next(); 553 initializePort(name, type); 554 } 555 556 } 557 558 /** 559 * Add a new port. 560 * 561 * @param aPortName 562 * name of new port 563 * @param aPortType 564 * Type of new port 565 * @throws IllegalActionException 566 * When bad things happen. 567 */ 568 void initializePort(String aPortName, Type aPortType) 569 throws IllegalActionException { 570 try { 571 String columnName = aPortName.trim(); 572 // Create a new port for each Column in the resultset 573 TypedIOPort port = (TypedIOPort) this.getPort(columnName); 574 boolean aIsNew = (port == null); 575 if (aIsNew) { 576 // Create a new typed port and add it to this container 577 port = new TypedIOPort(this, columnName, false, true); 578 new ptolemy.kernel.util.Attribute(port, "_showName"); 579 log.debug("Creating port [" + columnName + "]" + this); 580 } 581 port.setTypeEquals(aPortType); 582 583 } catch (ptolemy.kernel.util.NameDuplicationException nde) { 584 throw new IllegalActionException( 585 "One or more attributes has the same name. Please correct this and try again."); 586 } 587 588 } 589 590 /** 591 * Remove all ports which's name is not in the selected vector 592 * 593 * @param nonRemovePortName 594 * The ports to NOT remove. 595 * @throws IllegalActionException 596 * When bad things happen. 597 */ 598 void removeOtherOutputPorts(Collection nonRemovePortName) 599 throws IllegalActionException { 600 // Use toArray() to make a deep copy of this.portList(). 601 // Do this to prevent ConcurrentModificationExceptions. 602 TypedIOPort[] l = new TypedIOPort[0]; 603 l = (TypedIOPort[]) this.portList().toArray(l); 604 605 for (TypedIOPort port : l) { 606 if (port == null || port.isInput()) { 607 continue; 608 } 609 String currPortName = port.getName(); 610 if (!nonRemovePortName.contains(currPortName)) { 611 try { 612 port.setContainer(null); 613 } catch (Exception ex) { 614 throw new IllegalActionException(this, 615 "Error removing port: " + currPortName); 616 } 617 } 618 } 619 } 620 621 /** 622 * Remove all ports. 623 * 624 * @throws IllegalActionException 625 * When bad things happen. 626 */ 627 void removeAllOutputPorts() throws IllegalActionException { 628 // Use toArray() to make a deep copy of this.portList(). 629 // Do this to prevent ConcurrentModificationExceptions. 630 TypedIOPort[] ports = new TypedIOPort[0]; 631 ports = (TypedIOPort[]) this.portList().toArray(ports); 632 633 for (TypedIOPort port : ports) { 634 if (port != null && port.isOutput()) { 635 String currPortName = port.getName(); 636 try { 637 port.setContainer(null); 638 } catch (Exception ex) { 639 throw new IllegalActionException(this, 640 "Error removing port: " + currPortName); 641 } 642 } 643 644 } 645 } 646 647 /** 648 * Eliminates array dimensions whose dimensions are 1 (and thus in practice 649 * don't exisit) 650 * 651 * @param dds 652 * The DDS to traverse and squeeze its member arrays. 653 */ 654 public static void squeezeArrays(DConstructor dds) { 655 656 DArray a; 657 658 Enumeration e = dds.getVariables(); 659 while (e.hasMoreElements()) { 660 opendap.dap.BaseType bt = (opendap.dap.BaseType) e.nextElement(); 661 662 if (bt instanceof DArray) { 663 a = (DArray) bt; 664 log.debug("Squeezing array " + a.getTypeName() + " " 665 + a.getLongName() + ";"); 666 a.squeeze(); 667 // System.out.print("Post squeezing: "); 668 // a.printDecl(System.out); 669 bt = a.getPrimitiveVector().getTemplate(); 670 if (bt instanceof DConstructor) 671 squeezeArrays((DConstructor) bt); 672 } else if (bt instanceof DConstructor) { 673 squeezeArrays((DConstructor) bt); 674 } 675 676 } 677 } 678}