Tuesday 26 September 2017

BPEL Correlation for Asynch Invocation

Background
HTTP is stateless in nature, and thus provides no support for conversations requiring the exchange of multiple messages. With synchronous interactions, this is not an issue, as the response message for a particular request can be returned in the HTTP response. However, with asynchronous interactions, this is a more serious limitation.  Take below use case ..

Use case
A BPEL process "A" calls asynchronous process "B", and Process-B returns (callback) the result to Process-A after some significant amount of time (B is asynchronous process).  Assume multiple instances of process A and B running at the same time, once we have routed the message to the correct service endpoint, how does the service engine at that endpoint know which instance of "A" to route the response from "B" to?  For this similar kind of situations where WS-Addressing not works, BPEL provides a cool feature called "Correlation".

Correlation in BPEL

The key components in defining the Correlation functionality are : 1)- Correlation-set 2)- Correlation properties 3)- Correlation Aliases.

Correlation set is a kind of pointer/setup that groups the correlation-properties and associated correlation-aliases. That means, if an engineer assigns a correlation set to a process then, all the underneath correlation-properties and aliases are available to that process.

Correlation sets can be attached at process level as well as limit to a particular scope. In majority of integrations, correlation sets are attached at process level.

Lets take an example : 
A client application calls ProcessA with a message:
<customer>
   <ssn>xxx</ssn>
   <address></address>
</customer>),  

ProcessA calls ProcessB with the above same customer message,

ProcessB gets/calculate customer credits-score and returns score back to ProcessA in below message format :
<customer>
  <ssn>xxxx</ssn>
  <creditscore>xxx</creditscore>
</customer>

In this scenario, if correlation-set is defined then,  bpel-service-engine performs below steps (internally) -

1)- At the time of  INVOKING ProcessB from ProcessA, bpel-service-engine serialize (or dehydrate) the state of that particular ProcessA Instance in to database.

2)- If you compare this database with "Hash-table" then,  "hash-value" is Instance/state of ProcessA, and "Hash-key" (which is SSN) is identified based on the correlation-set (correlation-property -> correlation-alias) attached to "INVOKE" activity.

3)- Once credit-score is returned by ProcessB,  bpel-service-engine holds that return message, identifies the "key" in that message based on the correlation-set setup defined at "RECEIVE" activity (correlation-property -> correlation-alias), and finds appropriate processA instance/object in database (hash-table mentioned in step2) to handover the return message. 
In detail (with screen shots):
1)- "calling" Synch process is invoking "called" asynch-process.



2)- Create Correlationset (at process level). 
For this, Open Calling Bpel process -> Open Invoke activity -> go to "correlations tab" -> click on + sign in the "correlation tab form" -> click on "..." -> click on + sign -> Provide correlation name -> 
3)- Create a Correlation property 
For this, click the + sign of above screen (correlation set screen) -> 
hightlight properties -> click on + -> provide property name -> provide type (string) -> click ok
4)- Now create Alias (for "request message")
For this, click + sign in the "Alias" tab of "property form" -> select the "unique identifier" of the request (which is input message to "sub bpel process")
Make sure that, the filed "Query" in above screen should have an xpath expression to point exact "Key" element.

Thats it, you have created the mapping to find/capture the "Key" related to the "input" message of "Called process".
5)- Now, attach the mapping (correlation set) to the RECEIVE activity.
6)- click, "receive" activity of "calling process"  (this activity receives result from "called process") -> go to correlations tab ->  you can see the correlation-set created in above step-4.
7)- Select the "predefined" correlationset -> click on "edit" icon  (which opens correlation-set in edit mode) -> click on edit icon related to correlation-property (which opens the correlation-property in edit mode) -> click on "+" to create "new" alias (which is mapping) to map KEY of "result message".
now click on + sign to create new alias ->
Make sure that, the filed "Query" in above screen should have an xpath expression to point exact "Key" element. Now you all set with correlation functionality with your integration, just test it.

Oracle BPEL Stats/Metrics using custom Java code and dehydration tables

Background:

One of my client requirement is to pull BPEL stats without using Standard OEM services, so did some analysis to meet those requirements. 

Analysis details:

(In my next post, I can/will provide some inputs to extract/trap OSB based metrics.  Stay tuned..

What are all the BPEL process statuses we should consider to collect Matrix
- Closed.Aborted
- Closed.Cancelled
- Closed.Completed
- Closed.Faulted
- Closed.Pending_Cancel
- Closed.stale
- Initiated
- Open.Faulted
- Open.Running
- Open.Suspended
- unknown

What are the composite status codes referred in Oracle SOA Java API ?
Answer:
import oracle.soa.management.facade.CompositeInstance;
public class Lab {
public static void main(String[] args) {
// TODO Auto-generated method stub
System.out.println(CompositeInstance.STATE_COMPLETED + " : State Completed");
System.out.println(CompositeInstance.STATE_COMPLETED_SUCCESSFULLY + " : State Completed Successfully");
System.out.println(CompositeInstance.STATE_FAULTED + " : State Faulted");
System.out.println(CompositeInstance.STATE_IN_FLIGHT + " : State Inflight");
System.out.println(CompositeInstance.STATE_RECOVERY_REQUIRED + " : State Recovery Required");
System.out.println(CompositeInstance.STATE_RUNNING + " : State Running");
System.out.println(CompositeInstance.STATE_STALE + " : State Stale");
System.out.println(CompositeInstance.STATE_SUSPENDED + " : State Suspended");
System.out.println(CompositeInstance.STATE_TERMINATED_BY_USER + " : State Terminated by User");
System.out.println(CompositeInstance.STATE_UNKNOWN + " : State Unknown");
}
}

2 : State Completed
2 : State Completed Successfully
3 : State Faulted
0 : State Inflight
1 : State Recovery Required
0 : State Running
6 : State Stale
5 : State Suspended
4 : State Terminated by User
-1 : State Unknown

What are all the states are available in SOA BPEL for BPEL Processes?
Answer:
select distinct state, state_text
from ( 
 select cxci.state state,
 case cxci.state
 when 0 then 'initiated'
 when 1 then 'open.running'
 when 2 then 'open.suspended'
 when 3 then 'open.faulted'
 when 4 then 'closed.pending_cancel'
 when 5 then 'closed.completed'
 when 6 then 'closed.faulted'
 when 7 then 'closed.cancelled'
 when 8 then 'closed.aborted'
 when 9 then 'closed.stale'
 else 'unknown'
 end  as state_text
 from cube_instance cxci)
group by state, state_text;


What is the query, we can use to collect the metrix at Composite level (not at individual BPEL process level)
Answer:
select distinct composite_name, count(*)
from cube_instance
where cmpst_id != cikey and
componenttype = 'bpel'
group by composite_name
order by composite_name;

What is the query, we can use to collect the metrix at each BPEL individual process level.
Answer:
select distinct composite_name, component_name, count(*)
from cube_instance
where cmpst_id = cikey and
componenttype = 'bpel'
group by composite_name, component_name
order by composite_name, component_name

When these queries may give wrong results?
Answer:
=Case1: If BPEL-Process-A is calling another BPEL-Process-B then, the "idempotent property" in the invoke activity to call Process-B can impact.
If IdempOtent="True"  then, it doesnot store the Instance to de-hydration store (by defautl idempotent = "true"). Usually this property set to "true" for EJB and WSIF invocations.
=Case2: If the BPEL process is defined as "Transient" transactions using below technique then, that BPEL process doesnot store in the de-hydration table.
<property name="bpel.config.inMemoryOptimization">true</property>
<property name="bpel.config.completionPersistPolicy">off</property>

What are the key out-of box Dehydration tables that the EM and Dehydration-API (java pojos) uses to provide the metrix in em-console?
Answer:
=Views : bpel_process_instances, bpel_faults_vw
=Tables : Cube_instance, audit_trail, audit_details, invoke_message (may more tables are there :)

What is the other approach to trap BPEL Runtime instances?
Answer:
BPEL PM Java APIs.  Packages required are : orabpel.jar, ejb.jar, orabpel-common.jar, oc4j-internal.jar, optic.jar

Where can I find above Jar files?
Answer:
//Oracle_SOA1/soa/modules/oracle.soa.bpel_xxx
/\Middleware\Oracle_SOA1\soa\modules\oracle.soa.fabric_11.1.1
\oracle_common\soa\modules\oracle.soa.mgmt_11.1.1


Display all BPEL domains ?
Answer:
Import pacakges:
import com.oracle.bpel.client.BPELDomainStatus;
import com.oracle.bpel.client.Server;
import com.oracle.bpel.client.auth.ServerAuth;
import com.oracle.bpel.client.auth.ServerAuthFactory;

Connect factory properties to connect BPEL Weblogic server:
Properties props = new java.util.Properties();
props.put("java.naming.factory.initial",
"com.evermind.server.rmi.RMIInitialContextFactory" );
props.put("java.naming.provider.url",
"opmn:ormi://phanikumar-vb:xxxx:home/orabpel" );
props.put("java.naming.security.principal", "weblogic" );
props.put("java.naming.security.credentials", "xxxxxx" );
String securityCredentials = "xxxxxx";

ServerAuth auth = ServerAuthFactory.authenticate(securityCredentials, null, props);
Server srv = new Server(auth);

BPELDomainStatus domainList[] = srv.getAllDomainStatus();
for (int i=0; i<domainList.length; i++ ) {
System.out.println( "Domain ID="+domainList[i].getDomainId() );
}

BEA-000402 : error : Maximum number of socket reader thereads allowd by the configuration

Problem:
Received an error (while running long running pojos that access weblogic components thru JNDI) :  <Warning> <Socket> <BEA-000402> <There are: 18 active sockets, but the maximum number of socket reader threads allowed by the configuration is: 2. You may want to alter your configuration.>

Solution:
Navigate to weblogic config.xml
Find the parameter : ThreadPoolPercentSocketReaders
Increase the value any where between 70 to 90

Also navigate to weblogic.conf
change the parameter :  -Dweblogic.PosixSocketReaders to higher number (may be 8 or 9)

Oracle BPEL using JMX n MBeans

About JMX:

JMX (Java Management Extension) is part of J2SE 5.0 and meant to programmatically  manage and monitor application resources, Service Oriented Networks and Systems objects.   Refer the Wiki : http://en.wikipedia.org/wiki/Java_Management_Extensions

About MBeans:

MBeans, short  form of MangedBeans are used to represent the resources in jvm, java applications and application servers  through java objects. So, with the combination of JMX (Java programming APIs) and MBeans (java objects that are representing the resources such as Transaction monitors, JDBC Drivers and Printers and many more) we can collect/extract statistics related to performance, resource usages, and many more for the resources running in JVM.  Refer the Wiki: http://en.wikipedia.org/wiki/Java_Management_Extensions#Managed_Bean

This blog is meant to give some pointers/apis to fetch the details of resources (such as BPEL Domains, BPEL Servers, BPEL Composites, BPEL Composite configurations etc) hosted in Weblogic application server.  Have fun …

Topic

1)- Get BPEL Domains and Managed servers (weblogic) using JAVA 
2)- Get BPEL Composites hosted on Weblogic server
3)- Get BPEL Composite revisions
4)- Get all callable services (references) of a given BPEL Composite

1)- How to get BPEL Domains, managed servers, their restart dates and status :

public class PPKBPELExtraction {
    //Make sure to define below properties based on your BPEL Domain/Manged server setups.
    String host;
    String port;
    String userName;
    String pwd;
    String webserverHost;
    String defaultDomain;
    String bpel_listner_port;
    String bpel_listner_host;

            JMXServiceURL jmxServiceURL = null;
       Hashtable<String,String> jmxJNDIProperties = new Hashtable<String, String>();     
       JMXConnector jmxConnector = null;
       MBeanServerConnection mBeanServerConnection = null;    
       Hashtable<String, String> mBeanQueryStrings = new Hashtable<String, String>();
       String uriForDomainConfig = "/jndi/weblogic.management.mbeanservers.domainruntime";
       JMXConnector jmxConnectorForDomainConfig = null;
       MBeanServerConnection mBeanServerConnectionForDomainConfig = null;
       Hashtable<String,String> jmxJNDIPropertiesForDomainConfig = new Hashtable<String, String>();       
       JMXServiceURL jmxServiceURLForDomainConfig = null;
       SimpleDateFormat DATE_FORMAT = new SimpleDateFormat("dd-MM-yyyy HH:mm:ss");

public void getDomainDetails() {
       writeToLog(" getDomainDetails() => ");
              try {                                    
                     jmxServiceURLForDomainConfig = new JMXServiceURL("t3",this.host,Integer.parseInt(this.port),uriForDomainConfig);
                     jmxJNDIPropertiesForDomainConfig.put(Context.SECURITY_PRINCIPALthis.userName);
                     jmxJNDIPropertiesForDomainConfig.put(Context.SECURITY_CREDENTIALSthis.pwd);
                     jmxJNDIPropertiesForDomainConfig.put(JMXConnectorFactory.PROTOCOL_PROVIDER_PACKAGES"weblogic.management.remote");                  
                     jmxConnectorForDomainConfig = JMXConnectorFactory.connect(jmxServiceURLForDomainConfigjmxJNDIPropertiesForDomainConfig);
                     mBeanServerConnectionForDomainConfig = jmxConnectorForDomainConfig.getMBeanServerConnection();
              } catch (Exception expGetDomainDetails) {
                     writeToLog("expGetDomainDetails : " + expGetDomainDetails.toString());
              }
              String queryString = "com.bea:Name=DomainRuntimeService,Type=weblogic.management.mbeanservers.domainruntime.DomainRuntimeServiceMBean";
       ObjectName domainObject = null;
       ObjectName domainObjectName = null;
       String domainName = null;
       Date activationDate = null;
              ObjectName[] serverObjects;
              int length = 0;
              String serverName;
              String serverState;
        
       try {
              ObjectName objectName = new ObjectName(queryString);
              Set<ObjectName> objectNameSet =mBeanServerConnectionForDomainConfig.queryNames(objectName, null);
              Iterator<ObjectName> objectNameSetIterator = objectNameSet.iterator();
              while (objectNameSetIterator.hasNext()){
                     domainObject = objectNameSetIterator.next();
                     domainObjectName = (ObjectName) mBeanServerConnectionForDomainConfig.getAttribute(domainObject,"DomainRuntime");
                     domainName = (String) mBeanServerConnectionForDomainConfig.getAttribute(domainObjectName,"Name");
                     activationDate = (Date) mBeanServerConnectionForDomainConfig.getAttribute(domainObjectName, "ActivationTime");
                     serverObjects = (ObjectName[]) mBeanServerConnectionForDomainConfig.getAttribute(domainObject,"ServerRuntimes");
                     length = serverObjects.length;
                     Vector<Server> tmpServer = new Vector<Server>();
                     for (int i = 0; i < length; i++) {
                         serverName = (String) mBeanServerConnectionForDomainConfig.getAttribute(serverObjects[i], "Name");
                         serverState = (String) mBeanServerConnectionForDomainConfig.getAttribute(serverObjects[i],"State");
                         tmpServer.add(new Server(domainName,serverName,null,serverState));
                     }
                     this.domains.add(new Domain(domainName, activationDate,this.type,tmpServer));
                     this.defaultDomain = domainName;
                     tmpServer = null;
                     domainObject = null;
              } 
              objectNameSetIterator = null;
              objectNameSet = null;
       } catch (Exception expDomain) {
              this.domains.add(new Domain(this.defaultDomain, (new Date()) ,"",null)); // Incase admin server is down.
              writeToLog("expDomain : " + expDomain.toString());
              writeToLog("Looks like Adminserver is having issues,  so defaulting Admin server activation date with sysdate");
       }
            try {
                     if (jmxConnectorForDomainConfig != null)
                           jmxConnectorForDomainConfig.close();
                           jmxJNDIPropertiesForDomainConfig = null;
                           jmxConnectorForDomainConfig = null;
                           mBeanServerConnectionForDomainConfig = null;
              } catch (Exception expClose) {
                     writeToLog("expClose : " + expClose.toString());
              }
              writeToLog(" <= getDomainDetails()");
    } 

2)- How to get bpel composites hosted in Weblogic managed server?

    public void fetchBPELComposite() {
        ObjectName compositeObjectName = null;
        String dn = null;
        Iterator<ObjectName> iterator = null;
        Set<ObjectName> queryObjectName = null;
        EndPoint endPoint = null;
        String compositeName = null
        String revision = null;
        String compositeName1 = null;
       try {
            ObjectName queryObject = new ObjectName("oracle.soa.config:j2eeType=SCAComposite,Application=soa-infra,*");
            queryObjectName = mBeanServerConnection.queryNames(queryObject, null);
            iterator = queryObjectName.iterator();

            while (iterator.hasNext()) {
              compositeObjectName = iterator.next();
              dn = (String) mBeanServerConnection.getAttribute(compositeObjectName, "DN");
              compositeName = (String) mBeanServerConnection.getAttribute(compositeObjectName, "Name");
            }
       } catch (Exception expGetBPELComposite) {
              writeToLog("getBPELComposite() " + expGetBPELComposite.toString());
       } finally {
            compositeObjectName = null;
            dn = null;
            iterator = null
            queryObjectName = null;
       }      
     }

3)- How to get the default revision of a BPEL Composite

=> In the below api DN is the Compsoite DN name (which you can obtain from Enterprise manager).
       public boolean defaultRevision(String dn) {
              boolean returnValue= false;
              ObjectName queryObject = null;
              ObjectName soaInfra = null;
              Object[] vInputs = {dn};
              String[] vTypes = {String.class.getName()};
              
              try {
                     queryObject = new ObjectName("oracle.soa.config:name=soa-infra,j2eeType=CompositeLifecycleConfig,Application=soa-infra,*");
                     Set<ObjectName> objectNameSet = mBeanServerConnection.queryNames(queryObject, null);
                     Iterator<ObjectName> iterator = objectNameSet.iterator();
                     while (iterator.hasNext()) {
                           soaInfra = iterator.next();
                           returnValue = (Boolean) mBeanServerConnection.invoke(soaInfra, "isDefaultCompositeRevision", vInputs, vTypes);
                     }
              }catch (Exception expIsDefaultRevision ) {
                     returnValue = false;
                     writeToLog("expIsDefaultRevision : "+ expIsDefaultRevision.toString());
              }
              return returnValue;
       }

4)- How to get the services (references) called from given BPEL Composite :

       public void bPELEndSCAReferences(EndPoint pEndPoint, String pCompositeName, String pRevision) {         
       StringBuffer querySB = newStringBuffer("oracle.soa.config:j2eeType=SCAComposite.SCAReference.SCABinding,wsconfigtype=ServiceRefMappingPortInfoConfig,revision=").append(pRevision).append(",Application=soa-infra,SCAComposite=\"");
       querySB.append(pCompositeName+"\",*");
       String query = querySB.toString().trim();
       ObjectName on = null;
       String referenceName = null;
       String referenceWSDL = null;
       Vector<EndPoint> ref = new Vector<EndPoint>();
       EndPoint reference = null;
       try {
              ObjectName qn = new ObjectName (query);
              Set<ObjectName> objectSet = mBeanServerConnection.queryNames(qn, null);
              Iterator<ObjectName> iterator = null;
              iterator = objectSet.iterator();
            while ((iterator != null) && (iterator.hasNext())) {
                 on = iterator.next();
                 referenceName = (String) mBeanServerConnection.getAttribute(on, "PolicySubjectName");
                 referenceWSDL = (String) mBeanServerConnection.getAttribute(on, "Location");
                 reference = new EndPoint(this.defaultDomain, referenceName,null,true,"http","REFERENCE",referenceName,this.rand.nextLong());
                 reference.monitorURL = referenceWSDL;
                 //writeToLog("Domain " + this.defaultDomain +" Composite " + " Reference:" + referenceName +"; wsdl:"+ referenceWSDL);
                 ref.add(reference);
              }             
       } catch (Exception exp) {
              writeToLog("PPK Exception in displayBPELEndSCAReferences()" + exp.toString());
              writeToLog("query string : " + query);
              writeToLog("composite : " + pCompositeName + pRevision);
       }
       if ((pEndPoint.references != null) && (pEndPoint.references.size() > 0))
              pEndPoint.references.addAll(ref);
       else
           pEndPoint.references = ref;
    }

Happy programming ...