001/////////////////////////////////////////////////////////////////////////////
002// Copyright (c) 2009 OPeNDAP, Inc.
003// All rights reserved.
004// Permission is hereby granted, without written agreement and without
005// license or royalty fees, to use, copy, modify, and distribute this
006// software and its documentation for any purpose, provided that the above
007// copyright notice and the following two paragraphs appear in all copies
008// of this software.
009//
010// IN NO EVENT SHALL OPeNDAP BE LIABLE TO ANY PARTY FOR DIRECT, INDIRECT,
011// SPECIAL, INCIDENTAL, OR CONSEQUENTIAL DAMAGES ARISING OUT OF THE USE OF
012// THIS SOFTWARE AND ITS DOCUMENTATION, EVEN IF OPeNDAP HAS BEEN ADVISED
013// OF THE POSSIBILITY OF SUCH DAMAGE.
014//
015// OPeNDAP SPECIFICALLY DISCLAIMS ANY WARRANTIES, INCLUDING, BUT NOT
016// LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A
017// PARTICULAR PURPOSE. THE SOFTWARE PROVIDED HEREUNDER IS ON AN "AS IS"
018// BASIS, AND OPeNDAP HAS NO OBLIGATION TO PROVIDE MAINTENANCE, SUPPORT,
019// UPDATES, ENHANCEMENTS, OR MODIFICATIONS.
020//
021// Author: Nathan David Potter  <ndp@opendap.org>
022// You can contact OPeNDAP, Inc. at PO Box 112, Saunderstown, RI. 02874-0112.
023//
024/////////////////////////////////////////////////////////////////////////////
025
026package org.kepler.dataproxy.datasource.opendap;
027
028import java.util.Collection;
029import java.util.Enumeration;
030import java.util.Iterator;
031import java.util.Vector;
032
033import org.apache.commons.logging.Log;
034import org.apache.commons.logging.LogFactory;
035
036import opendap.clients.odc.ApplicationController;
037import opendap.clients.odc.DodsURL;
038import opendap.dap.DArray;
039import opendap.dap.DConnect2;
040import opendap.dap.DConstructor;
041import opendap.dap.DDS;
042import ptolemy.actor.TypedIOPort;
043import ptolemy.actor.lib.Source;
044import ptolemy.data.BooleanToken;
045import ptolemy.data.StringToken;
046import ptolemy.data.expr.FileParameter;
047import ptolemy.data.expr.Parameter;
048import ptolemy.data.type.Type;
049import ptolemy.kernel.CompositeEntity;
050import ptolemy.kernel.util.IllegalActionException;
051import ptolemy.kernel.util.NameDuplicationException;
052import ptolemy.moml.MoMLChangeRequest;
053
054/**
055 * The OPeNDAP actor reads data from OPeNDAP data sources (i.e. servers).
056 * 
057 * <h1>OPeNDAP Actor Overview</h1>
058 * 
059 * The OPeNDAP actor provides access to data served by any Data Access Protocol
060 * (DAP) 2.0 compatible data source. The actor takes as configuration parameters
061 * the URL to the data source and an optional constraint expression (CE). Based
062 * on the URL and optional CE, the actor configures its output ports to match
063 * the variables to be read from the data source.
064 * 
065 * <h2>More information about the OPeNDAP actor</h2>
066 * 
067 * The OPeNDAP actor reads data from a single DAP data server and provides that
068 * data as either a vector.matrix or array for processing by downstream elements
069 * in a Kepler workflow. Each DAP server provides (serves) many data sources and
070 * each of those data sources can be uniquely identified using a URL in a way
071 * that's similar to how pages are provided by a web server. For more
072 * information on the DAP and on OPeNDAP's software, see www.opendap.org.
073 * 
074 * <h3>Characterization of Data Sources</h3>
075 * 
076 * Data sources accessible using DAP 2.0 are characterized by a URL that
077 * references a both a specific data server and a data granule available from
078 * that server and a Constraint Expression that describes which variables to
079 * read from within the data granule. In addition to reading data from a
080 * granule, a DAP 2.0 server can provide two pieces of information about the
081 * granule: a description of all of its variables, their names and their data
082 * types; and a collection of 'attributes' which are bound to those variables.
083 * 
084 * <h3>Operation of the Actor</h3>
085 * 
086 * The actor must have a valid URL before it can provide any information (just
087 * as a file reader actor need to point toward a file to provide data). Given a
088 * URL and the optional CE, the OPeNDAP actor will interrogate that data source
089 * and configure its output ports.
090 * 
091 * <h3>Data Types Returned by the Actor</h3>
092 * 
093 * There are two broad classes of data types returned by the actor. First there
094 * are vectors, matrices and arrays. These correspond to one, two and N (&gt; 2)
095 * dimensional arrays. The distinction between the vector and matrix types and
096 * the N-dimensional array is that Kepler can operate on the vector and matrix
097 * types far more efficiently than the N-dimensional arrays. Many variables
098 * present in DAP data sources are of the N-dimensional array class and one way
099 * to work with these efficiently is to use the constraint expression to reduce
100 * the order of these data to one or two, thus causing the actor to store them
101 * in a vector or matrix.
102 * 
103 * <p>
104 * As an example, consider the FNOC1 data source available at test.opendap.org.
105 * The full URL for this is http://test.opendap.org/opendap/data/nc/fnoc1.nc. It
106 * contains a variable 'u' which has three dimensions. We can constrain 'u' so
107 * that it has only two dimensions when read into Kepler using the CE
108 * 'u[0][0:16][0:20]' which selects only the first element (index 0) for the
109 * first dimension while requesting all of the remaining elements for the second
110 * and third dimensions. The www.opendap.org has documentation about the CE
111 * syntax.
112 * </p>
113 * 
114 * <p>
115 * The second data type returned by the actor is a record. In reality, all DAP
116 * data sources are records but the actor automatically 'disassembles' the top
117 * most record since we know that's what the vast majority of users will want.
118 * However, some data sources contains nested hierarchies of records many levels
119 * deep. When dealing with those data sources you will need to use the Kepler
120 * record disassembler in your work flow.
121 * </p>
122 * 
123 * @author Nathan Potter
124 * @version $Id: OpendapDataSourceODC.java 33629 2015-08-24 22:43:10Z crawl $
125 * @since Kepler 1.0RC1
126 * @date Jul 17, 2007
127 */
128public class OpendapDataSourceODC extends Source {
129
130        static Log log;
131        static {
132                log = LogFactory
133                                .getLog("org.kepler.dataproxy.datasource.opendap.OpendapDataSource");
134        }
135
136        private static final String OPENDAP_CONFIG_DIR = "/configs/ptolemy/configs/kepler/opendap";
137
138        /**
139         * The OPeNDAP URL that identifies a (possibly constrained) dataset.
140         */
141        public FileParameter opendapURLParameter = null;
142
143        /**
144         * The OPeNDAP Constraint Expression used to sub sample the dataset.
145         */
146        public FileParameter opendapCEParameter = null;
147
148        // *** Remove. jhrg
149        public Parameter runODC = null;
150
151        private String opendapURL;
152        private String opendapCE;
153        private DConnect2 dapConnection;
154
155        public OpendapDataSourceODC(CompositeEntity container, String name)
156                        throws NameDuplicationException, IllegalActionException {
157                super(container, name);
158
159                opendapURLParameter = new FileParameter(this, "opendapURLParameter");
160                opendapCEParameter = new FileParameter(this, "opendapCEParameter");
161
162                opendapURL = "";
163                opendapCE = "";
164                dapConnection = null;
165
166                runODC = new Parameter(this, "runODC");
167                runODC.setTypeEquals(ptolemy.data.type.BaseType.BOOLEAN);
168                runODC.setExpression("false");
169        }
170
171        /**
172         * 
173         * @param attribute
174         *            The changed Attribute.
175         * @throws ptolemy.kernel.util.IllegalActionException
176         *             When bad things happen.
177         */
178        public void attributeChanged(ptolemy.kernel.util.Attribute attribute)
179                        throws ptolemy.kernel.util.IllegalActionException {
180
181                log.debug("attributeChanged() start.");
182
183                if (attribute == opendapURLParameter || attribute == opendapCEParameter) {
184
185                        String url = opendapURLParameter.getExpression();
186                        String ce = opendapCEParameter.getExpression();
187
188                        if (attribute == opendapURLParameter)
189                                log.debug("--- attributeChanged() url: " + url
190                                                + " Current URL: " + opendapURL);
191                        if (attribute == opendapCEParameter)
192                                log.debug("--- attributeChanged()  ce: \"" + ce
193                                                + "\" Current CE: \"" + opendapCE + "\"");
194
195                        boolean reload = false;
196                        if (!opendapURL.equals(url)) {
197                                opendapURL = url;
198
199                                // only reload if not empty.
200                                if (!url.equals("")) {
201                                        reload = true;
202                                }
203                        }
204
205                        if (!opendapCE.equals(ce)) {
206                                opendapCE = ce;
207                                // *** I think this should test if url.equals(""). jhrg
208                                reload = true;
209                        }
210
211                        if (reload) {
212
213                                try {
214
215                                        log.debug("OPeNDAP URL: " + opendapURL);
216                                        dapConnection = new DConnect2(opendapURL);
217
218                                        DDS dds = dapConnection.getDDS(opendapCE);
219
220                                        log.debug("Got DDS.");
221                                        // dds.print(System.out);
222
223                                        log.debug("Squeezing arrays.");
224                                        squeezeArrays(dds);
225
226                                        // log.debug("Before ports configured.");
227                                        // dds.print(System.out);
228
229                                        log.debug("Configuring ports.");
230                                        configureOutputPorts(dds);
231
232                                        // log.debug("After ports configured.");
233                                        // dds.print(System.out);
234
235                                } catch (Exception e) {
236                                        e.printStackTrace();
237                                        throw new IllegalActionException("Problem accessing "
238                                                        + "OPeNDAP Data Source: " + e.getMessage());
239                                }
240
241                        }
242
243                }
244                // *** Remove the ODC option. jhrg
245                else if (attribute == runODC) {
246                        BooleanToken token = (BooleanToken) runODC.getToken();
247                        if (token.booleanValue()) {
248                                // start ODC in separate thread
249                                ODCThread tr = new ODCThread();
250                                tr.start();
251                                runODC.setExpression("false");
252                        }
253                }
254        }
255
256        // a thread to run ODC
257        // *** Remove. jhrg
258        private class ODCThread extends Thread {
259                ODCThread() {
260                        super();
261                }
262
263                public void run() {
264
265                        String keplerprop = System.getProperty("KEPLER");
266                        if (keplerprop == null) {
267                                keplerprop = ".";
268                        }
269
270                        String configDir = keplerprop + OPENDAP_CONFIG_DIR;
271
272                        DodsURL[] urls = ApplicationController
273                                        .blockingMain(new String[] { configDir });
274
275                        if (urls != null) {
276                                if (urls.length > 1) {
277                                        // XXX how is this case possible?
278                                        log.warn("More than one URL returned from ODC: "
279                                                        + urls.length);
280                                }
281
282                                String odcUrl = urls[0].getFullURL();
283
284                                String urlStr = null;
285                                String ceStr = null;
286                                int index = odcUrl.indexOf("?");
287                                if (index == -1) {
288                                        urlStr = odcUrl;
289                                } else {
290                                        urlStr = odcUrl.substring(0, index);
291                                        ceStr = odcUrl.substring(index + 1);
292                                }
293
294                                log.debug("ODC sent url = " + urlStr);
295
296                                try {
297                                        // first clear the old URL and CE
298                                        // NOTE: setting a new URL before clearing old CE
299                                        // will cause a reload, which may not work since
300                                        // the CE could refer to fields not present in the
301                                        // new URL.
302                                        opendapURLParameter.setToken(new StringToken(""));
303                                        opendapCEParameter.setToken(new StringToken(""));
304
305                                        opendapURLParameter.setToken(new StringToken(urlStr));
306                                        if (ceStr != null) {
307                                                opendapCEParameter.setToken(new StringToken(ceStr));
308                                                log.debug("ODC sent ce = " + ceStr);
309                                        }
310                                } catch (IllegalActionException e) {
311                                        log.error(e);
312                                }
313
314                                // queue a dummy change request that will cause
315                                // the gui to show the new output ports.
316                                String buffer = "<group>\n</group>";
317                                MoMLChangeRequest request = new MoMLChangeRequest(this,
318                                                getContainer(), buffer);
319                                request.setPersistent(false);
320                                requestChange(request);
321                        }
322                }
323        }
324
325        public void preinitialize() throws IllegalActionException {
326
327                super.preinitialize();
328                log.debug("--- preintitialize");
329
330        }
331
332        public void initialize() throws IllegalActionException {
333
334                super.initialize();
335                log.debug("--- intitialize");
336
337        }
338
339        public boolean prefire() throws IllegalActionException {
340                super.prefire();
341
342                log.debug("--- prefire");
343
344                try {
345
346                        if (dapConnection == null) {
347                                log.debug("OPeNDAP URL: " + opendapURL);
348                                dapConnection = new DConnect2(opendapURL);
349                        }
350
351                } catch (Exception e) {
352                        log.error("prefire Failed: ", e);
353                }
354
355                return true;
356        }
357
358        public void fire() throws IllegalActionException {
359                super.fire();
360                log.debug("\n\n\n--- fire");
361
362                try {
363                        String ce = opendapCEParameter.getExpression();
364                        log.debug("Constraint Expression: " + ce);
365
366                        ce = createCEfromWiredPorts(ce);
367
368                        log.debug("Using CE: " + ce);
369
370                        DDS dds = dapConnection.getData(ce);
371                        // log.debug("fire(): dapConnection.getData(ce) returned DataDDS:");
372                        // dds.print(System.out);
373
374                        log.debug("Squeezing arrays.");
375                        squeezeArrays(dds);
376
377                        log.debug("Broadcasting DAP data arrays.");
378                        broadcastDapData(dds);
379
380                        // log.debug("fire(): After data broadcast:");
381                        // dds.print(System.out);
382
383                } catch (Exception e) {
384                        log.error("fire() Failed: ", e);
385
386                }
387        }
388
389        public boolean postfire() throws IllegalActionException {
390
391                super.postfire();
392                log.debug("--- postfire");
393
394                return false;
395
396        }
397
398        /**
399         * Build up the projection part of the constraint expression (CE) in order
400         * to minimize the amount of data retrieved. If the CE is empty, then this
401         * will build a list of projected variables base on which output ports are
402         * wired. If the CE is not empty then it will not be modified.
403         * 
404         * @param ce
405         *            The current CE
406         * @return A new CE if the passed one is not empty, a new one corresponding
407         *         to the wired output ports otherwise.
408         * @exception IllegalActionException
409         *                If the width of the wired ports cannot be calculated.
410         */
411        private String createCEfromWiredPorts(String ce)
412                        throws IllegalActionException {
413
414                if (ce.equals("")) {
415
416                        // Get the port list
417                        Iterator i = this.outputPortList().iterator();
418
419                        String projection = "";
420                        int pcount = 0;
421                        while (i.hasNext()) {
422                                TypedIOPort port = (TypedIOPort) i.next();
423                                if (port.getWidth() > 0) {
424                                        log.debug("Added " + port.getName() + " to projection.");
425                                        if (pcount > 0)
426                                                projection += ",";
427                                        projection += port.getName();
428                                        pcount++;
429                                }
430                        }
431                        ce = projection;
432                }
433
434                return ce;
435
436        }
437
438        /**
439         * Walks through the DDS, converts DAP data to ptII data, and broadcasts the
440         * data onto the appropriate ports.
441         * 
442         * @param dds
443         *            The DDS from which to get the data to send
444         * @throws IllegalActionException
445         *             When bad things happen.
446         */
447        private void broadcastDapData(DDS dds) throws IllegalActionException {
448
449                // log.debug("broadcastDapData(): DataDDS prior to broadcast:");
450                // dds.print(System.out);
451
452                Enumeration e = dds.getVariables();
453                while (e.hasMoreElements()) {
454                        opendap.dap.BaseType bt = (opendap.dap.BaseType) e.nextElement();
455
456                        String columnName = bt.getLongName().trim();
457                        // Get the port associated with this DDS variable.
458                        TypedIOPort port = (TypedIOPort) this.getPort(columnName);
459                        if (port == null) {
460                                throw new IllegalActionException(
461                                                "Request Output Port Missing: " + columnName);
462                        }
463
464                        log.debug("Translating data.");
465                        // bt.printDecl(System.out);
466
467                        // Map the DAP data for this variable into the ptII Token model.
468                        ptolemy.data.Token token = TokenMapper.mapDapObjectToToken(bt,
469                                        false);
470                        log.debug("Data Translated :");
471                        // bt.printDecl(System.out);
472
473                        // Send the data.
474                        log.debug("Sending data.");
475                        port.broadcast(token);
476                        log.debug("Sent data.");
477
478                }
479
480        }
481
482        /**
483         * Probe a port
484         * 
485         * @param port
486         *            The port to probe.
487         * @return The probe report.
488         */
489        public static String portInfo(TypedIOPort port) {
490
491                String width = "";
492                try {
493                        width = Integer.valueOf(port.getWidth()).toString();
494                } catch (IllegalActionException ex) {
495                        width = "Failed to get width of port " + port.getFullName() + ex;
496                }
497
498                String description = "";
499                try {
500                        description = port.description();
501                } catch (IllegalActionException ex) {
502                        description = "Failed to get the description of port "
503                                        + port.getFullName() + ": " + ex;
504                }
505                String msg = "Port Info: \n";
506
507                msg += "    getName():         " + port.getName() + "\n";
508                msg += "    getWidth():        " + width + "\n";
509                msg += "    isInput():         " + port.isInput() + "\n";
510                msg += "    isOutput():        " + port.isOutput() + "\n";
511                msg += "    isMultiport():     " + port.isMultiport() + "\n";
512                msg += "    className():       " + port.getClassName() + "\n";
513                msg += "    getDisplayName():  " + port.getDisplayName() + "\n";
514                msg += "    getElementName():  " + port.getElementName() + "\n";
515                msg += "    getFullName():     " + port.getFullName() + "\n";
516                msg += "    getSource():       " + port.getSource() + "\n";
517                msg += "    description():     " + description + "\n";
518                msg += "    toString():        " + port + "\n";
519
520                return msg;
521
522        }
523
524        /**
525         * Configure the output ports to expose all of the variables at the top
526         * level of the (potentially constrained) DDS.
527         * 
528         * @param dds
529         *            The DDS
530         * @throws IllegalActionException
531         *             When bad things happen.
532         */
533        private void configureOutputPorts(DDS dds) throws IllegalActionException {
534
535                Vector<Type> types = new Vector<Type>();
536                Vector<String> names = new Vector<String>();
537
538                Enumeration e = dds.getVariables();
539                while (e.hasMoreElements()) {
540                        opendap.dap.BaseType bt = (opendap.dap.BaseType) e.nextElement();
541                        types.add(TypeMapper.mapDapObjectToType(bt, false));
542                        names.add(bt.getLongName());
543                }
544
545                removeOtherOutputPorts(names);
546
547                Iterator ti = types.iterator();
548                Iterator ni = names.iterator();
549
550                while (ti.hasNext() && ni.hasNext()) {
551                        Type type = (Type) ti.next();
552                        String name = (String) ni.next();
553                        initializePort(name, type);
554                }
555
556        }
557
558        /**
559         * Add a new port.
560         * 
561         * @param aPortName
562         *            name of new port
563         * @param aPortType
564         *            Type of new port
565         * @throws IllegalActionException
566         *             When bad things happen.
567         */
568        void initializePort(String aPortName, Type aPortType)
569                        throws IllegalActionException {
570                try {
571                        String columnName = aPortName.trim();
572                        // Create a new port for each Column in the resultset
573                        TypedIOPort port = (TypedIOPort) this.getPort(columnName);
574                        boolean aIsNew = (port == null);
575                        if (aIsNew) {
576                                // Create a new typed port and add it to this container
577                                port = new TypedIOPort(this, columnName, false, true);
578                                new ptolemy.kernel.util.Attribute(port, "_showName");
579                                log.debug("Creating port [" + columnName + "]" + this);
580                        }
581                        port.setTypeEquals(aPortType);
582
583                } catch (ptolemy.kernel.util.NameDuplicationException nde) {
584                        throw new IllegalActionException(
585                                        "One or more attributes has the same name.  Please correct this and try again.");
586                }
587
588        }
589
590        /**
591         * Remove all ports which's name is not in the selected vector
592         * 
593         * @param nonRemovePortName
594         *            The ports to NOT remove.
595         * @throws IllegalActionException
596         *             When bad things happen.
597         */
598        void removeOtherOutputPorts(Collection nonRemovePortName)
599                        throws IllegalActionException {
600                // Use toArray() to make a deep copy of this.portList().
601                // Do this to prevent ConcurrentModificationExceptions.
602                TypedIOPort[] l = new TypedIOPort[0];
603                l = (TypedIOPort[]) this.portList().toArray(l);
604
605                for (TypedIOPort port : l) {
606                        if (port == null || port.isInput()) {
607                                continue;
608                        }
609                        String currPortName = port.getName();
610                        if (!nonRemovePortName.contains(currPortName)) {
611                                try {
612                                        port.setContainer(null);
613                                } catch (Exception ex) {
614                                        throw new IllegalActionException(this,
615                                                        "Error removing port: " + currPortName);
616                                }
617                        }
618                }
619        }
620
621        /**
622         * Remove all ports.
623         * 
624         * @throws IllegalActionException
625         *             When bad things happen.
626         */
627        void removeAllOutputPorts() throws IllegalActionException {
628                // Use toArray() to make a deep copy of this.portList().
629                // Do this to prevent ConcurrentModificationExceptions.
630                TypedIOPort[] ports = new TypedIOPort[0];
631                ports = (TypedIOPort[]) this.portList().toArray(ports);
632
633                for (TypedIOPort port : ports) {
634                        if (port != null && port.isOutput()) {
635                                String currPortName = port.getName();
636                                try {
637                                        port.setContainer(null);
638                                } catch (Exception ex) {
639                                        throw new IllegalActionException(this,
640                                                        "Error removing port: " + currPortName);
641                                }
642                        }
643
644                }
645        }
646
647        /**
648         * Eliminates array dimensions whose dimensions are 1 (and thus in practice
649         * don't exisit)
650         * 
651         * @param dds
652         *            The DDS to traverse and squeeze its member arrays.
653         */
654        public static void squeezeArrays(DConstructor dds) {
655
656                DArray a;
657
658                Enumeration e = dds.getVariables();
659                while (e.hasMoreElements()) {
660                        opendap.dap.BaseType bt = (opendap.dap.BaseType) e.nextElement();
661
662                        if (bt instanceof DArray) {
663                                a = (DArray) bt;
664                                log.debug("Squeezing array " + a.getTypeName() + " "
665                                                + a.getLongName() + ";");
666                                a.squeeze();
667                                // System.out.print("Post squeezing: ");
668                                // a.printDecl(System.out);
669                                bt = a.getPrimitiveVector().getTemplate();
670                                if (bt instanceof DConstructor)
671                                        squeezeArrays((DConstructor) bt);
672                        } else if (bt instanceof DConstructor) {
673                                squeezeArrays((DConstructor) bt);
674                        }
675
676                }
677        }
678}