High Performance OPC UA Server SDK  1.4.1.263
Accessing Asynchronous Data

This lesson explains how to implement access to data from a asynchronously connnected device.

This is achieved by implementing custom service handler functions to provide the values.

Files used in this lesson:

Preliminary Note

The server is similar to Lesson 2: Custom Provider with Value Stores and provides three 32bit unsigned integer variables from a arbitrary device. The difference is in the implementation, in this lesson the service handlers are implemented directly instead of using the value store for accessing the device. Integrating a device that way is more complex, but gives more freedom in the implementation and allows mass operations, asynchronous read from the device or the direct handling of monitored items by the device.

Step 1: Initialize the Custom Provider and Create Nodes

Initialize the Custom Provider

The custom provider is again initialized with the init function in custom_provider.c that is similar to lesson 2. The custom store to access the device has been removed, but the memorystore is still there and will contain the property values. Also the registered service handler functions have changed to the functions that will be implemented in this lesson:

#ifdef ENABLE_SERVICE_READ
ctx->read = custom_provider_read;
#endif
#ifdef ENABLE_SERVICE_WRITE
ctx->write = custom_provider_write;
#endif
#ifdef ENABLE_SERVICE_SUBSCRIPTION
ctx->add_item = custom_provider_add_item;
ctx->remove_item = custom_provider_remove_item;
ctx->modify_item = custom_provider_modify_item;
ctx->set_item_mode = custom_provider_set_item_mode;
ctx->subscribe = custom_provider_subscribe;
#endif

Create Nodes

Creating the nodes happens in custom_provider_nodes.c and is also very similar to lesson 2. There is a function for creating the property EURange and insert its value in the memorystore and there is a function for creating the variable node for the device's values. The difference here is the initial values of the device cannot be added to the store, but is directly written into the array. Also the store index must not be set, but the valueindex is still used to link the node to the value in the array:

ret = ua_variable_set_value_index(node, valueidx);
if (ret != 0) return UA_NODE_INVALID;
g_custom_provider_values[valueidx] = initial_value;

Step 2: Implement the Read Service Handler

The read service handler is implemented in custom_provider_read.c and consists of two functions. The first is a helper function for applying some checks and reading the value, this function is also used to read values for monitored items. In detail it needs to check the nodeclass, the access level, the access permissions and the indexrange. Then the valueindex from the node is retrieved, its range is checked and the value is copied from the value array to the result. At last the sourcetimestamp and servertimestamp must be added, if requested:

void custom_provider_read_value(
ua_node_t node,
struct uasession_session *session,
struct ua_indexrange *index_range,
unsigned int num_ranges,
struct ua_datavalue *result)
{
unsigned int valueidx;
int ret;
UA_UNUSED(index_range);
/* only variable node has the value attribute */
if (ua_node_get_nodeclass(node) != UA_NODECLASS_VARIABLE) {
return;
}
/* check if the value is readable */
return;
}
/* check user access */
#ifdef UA_AUTHORIZATION_SUPPORT
result->status = ua_authorization_is_readable(node, &session->user_ctx);
if (result->status != 0) return;
#else
UA_UNUSED(session);
#endif
/* can't read with indexrange from a scalar */
if (num_ranges > 0) {
return;
}
/* get value index and check array bounds */
ret = ua_variable_get_value_index(node, &valueidx);
if (ret != 0 || valueidx >= countof(g_custom_provider_values)) {
return;
}
/* copy value in the result variant */
ua_variant_set_uint32(&result->value, g_custom_provider_values[valueidx]);
result->status = 0;
/* add requested timestamps */
switch (ts) {
break;
break;
break;
default:
break;
}
}

The second function is the actual read service handler. As parameter it gets passed the uaprovider_read_ctx with the attached ua_readrequest and ua_readresponse. It iterates over all nodes in the read request to be read and skips all nodes that don't belong to the custom provider's namespace. Next the node handle for the current nodeid is looked up and the indexrange is parsed. Now the value can actually be read, all non-value attributes are handled by uaserver_read_internal. For value attributes it is checked if the store index is set, which means here the value is in the memory store. In this case the value is also read using uaserver_read_internal, that will read the value from the store. If the store index is not set, then the value must be read from the device by the helper function implemented above. When the loop is finished uaserver_read_complete is called to tell the SDK this provider has finished processing the request:

void custom_provider_read(struct uaprovider_read_ctx *ctx)
{
struct ua_readrequest *req = ctx->req;
struct ua_readresponse *res = ctx->res;
struct ua_indexrange range[2];
unsigned int num_ranges;
uint16_t ns_index;
uint8_t storeidx = 0;
ua_node_t node;
int i;
for (i = 0; i < req->num_nodes; i++) {
/* skip node from other namespace */
ns_index = req->nodes[i].node_id.nsindex;
continue;
}
/* lookup node handle */
node = ua_node_find(&req->nodes[i].node_id);
if (node == UA_NODE_INVALID) continue;
/* parse the indexrange */
num_ranges = countof(range);
res->results[i].status = ua_indexrange_parse(&req->nodes[i].index_range, range, &num_ranges);
continue;
}
} else {
num_ranges = 0;
}
if (req->nodes[i].attribute_id == UA_ATTRIBUTEID_VALUE) {
ua_variable_get_store_index(node, &storeidx);
if (storeidx != 0) {
/* read value from store */
uaserver_read_internal(node, ctx->session, req->ts, req->nodes[i].attribute_id, range, num_ranges, &res->results[i]);
} else {
/* read value from device */
custom_provider_read_value(node, ctx->session, req->ts, range, num_ranges, &res->results[i]);
}
} else {
/* read non-value attribute */
uaserver_read_internal(node, ctx->session, req->ts, req->nodes[i].attribute_id, range, num_ranges, &res->results[i]);
}
}
}

This implementation reads each value separatly from memory, so it effectively behaves like the implementation from lesson 2 using the value store. But this implementation allows not only for a synchronous read of values, it would also be possible to start a read operation for all values from the device and let the function return without calling uaserver_read_complete. When all operations have asynchronously returned from the device the response can be created and uaserver_read_complete must be called for the SDK to finished processing the response.

Step 3: Implement the Write Service Handler

Implementing the write service handler is analog to the read service handler, except for being implemented in one function only. This function takes an uaprovider_write_ctx with the attached ua_writerequest and ua_writeresponse. Again it needs to iterate through all nodes that are to be written and skips nodes from foreign namespaces. This write implementation only allows to write the value attribute, so all other attributes are skipped and only nodes from class variable are allowed. Writting with indexrange is also not supported, as the example nodes have scalars only. If the store index is set, the uaserver_write_internal function is used to write the value into the store and continue with the next node. If the store index is not set the node is from the device and the access level and access permissions must be checked. Then the value index is retrieved from the node and verified it is in bound of the array. After checking the datatype of the value to write it can finally be written to the value array representing the device. When all nodes are written, uaserver_write_complete must be used for the SDK to continue processing the response:

void custom_provider_write(struct uaprovider_write_ctx *ctx)
{
unsigned int valueidx;
struct ua_writerequest *req = ctx->req;
struct ua_writeresponse *res = ctx->res;
ua_node_t node;
uint16_t ns_index;
uint8_t storeidx = 0;
int32_t i;
int ret;
for (i = 0; i < req->num_nodes; i++) {
/* skip node from other namespace */
ns_index = req->nodes[i].node_id.nsindex;
continue;
}
/* skip non-value attributes */
if (req->nodes[i].attribute_id != UA_ATTRIBUTEID_VALUE) {
continue;
}
/* lookup node handle */
node = ua_node_find(&req->nodes[i].node_id);
if (node == UA_NODE_INVALID) continue;
/* check index range */
continue;
}
/* check nodeclass */
if (ua_node_get_nodeclass(node) != UA_NODECLASS_VARIABLE) {
continue;
}
/* if store index is set, use write_internal to write to store */
ua_variable_get_store_index(node, &storeidx);
if (storeidx != 0) {
node,
ctx->session,
NULL,
0,
&req->nodes[i].value);
continue;
}
/* write of status or timestamp is not supported */
if (req->nodes[i].value.status != 0 || req->nodes[i].value.server_timestamp != 0 || req->nodes[i].value.source_timestamp != 0) {
continue;
}
/* check if the value is writeable */
continue;
}
/* check user access permission */
#ifdef UA_AUTHORIZATION_SUPPORT
res->results[i] = ua_authorization_is_writable(node, &ctx->session->user_ctx);
if (res->results[i] != 0) continue;
#endif
/* get value index and check array bounds */
ret = ua_variable_get_value_index(node, &valueidx);
if (ret != 0 || valueidx >= countof(g_custom_provider_values)) {
continue;
}
/* check the datatype of the value to write */
if (req->nodes[i].value.value.type != UA_VT_UINT32) {
continue;
}
/* write new value */
g_custom_provider_values[valueidx] = req->nodes[i].value.value.value.ui32;
res->results[i] = 0;
}
}

Like in the read implementation it is also possible to process requests asynchronously by first starting the write operation in the service handler function and finish it later in another function by calling uaserver_write_complete when all write operations have finished.

Step 4: Implement the Monitored Item Service Handlers

This implementation of the monitored item service handlers uses a timer for each monitored item to poll the value with the sampling rate and inform the monitored item about the new value. If the underlying device has a better way for reporting data changes it should be used. So this implementation is mainly about explaining the monitored item mechanism and showing what to do in the different service handler functions, but there may be a better way on how to do it in your implementation.

Add a New Item

When a client requests to create monitored items, for each item a ua_monitoreditem struct is allocated and passed to the add item service handler. The handler function retrieves the node handle and session from the monitored item.

If a deadband filter is requested for the monitored item it is verified the value is a numeric datatype, otherwise the item must be rejected. The deadband filter itself is applied by the SDK when calling ua_monitoreditem_new_value.

If the deadband filter is of type percent deadband, the SDK needs to know the EURange of the value. The example uses ua_monitoreditem_node_read_eurange to find that property and read its value from the local address space, however this assumes the property does not change and thus only creates the monitored item if the EURange property is not writeable. The low and high value of the EURange are set on the montiored item, so these can be used for applying the deadband filter. Also note that percent deadband is only active if the CMake option UASERVER_SUPPORT_DEADBAND_PERCENT is enabled, otherwise the SDK rejects monitored items with percent deadband filters and this code can be omitted.

Next the initial value to send to the client is read. If the statuscode of the this value is bad, there is no monitored item created by the provider and the statuscode is returned and the SDK will delete the ua_monitoreditem struct and send the statuscode to the client as result. However there are few bad statuscodes which allow the monitored item to be created as it may get readable later on.

If the initial value is OK, a timer is created with the monitored item's sampling interval and a timer callback function that samples the value. If your device cannot meet the requested sampling interval it may use a different sampling interval and write it in the monitored item, this interval is then passed to the client as revised sampling interval.

The timer id is stored in the user data of the monitored item. The user data can be freely used by the provider to store own information about the monitored item. Finally the initial value is passed to the monitored item and the operation result is set to a good statuscode:

ua_statuscode custom_provider_add_item(struct ua_monitoreditem *item, uint32_t max_items)
{
int ret;
uint32_t timer_id;
uint8_t storeidx = 0;
struct uasession_session *session;
struct ua_datavalue *dv;
ua_node_t node;
UA_UNUSED(max_items);
/* lookup node handle */
/* get session from monitored item */
if (session == NULL) return UA_SCBADSESSIONCLOSED;
/* deadband is only allowed if the datatype is a subtype of number */
bool is_numeric = false;
ret = ua_monitoreditem_node_is_numeric(node, &is_numeric);
if (ret != 0 || !is_numeric) {
}
}
/* for deadband percent the EURange must be set */
uint32_t access_rights;
double high;
double low;
ret = ua_monitoreditem_node_read_eurange(node, &low, &high, &access_rights);
if (ret != 0) {
TRACE_WARNING(TRACE_FAC_SUBSCRIPTION, "%s: could not read EURange of request node, errorcode=%i\n", __func__, ret);
}
if ((access_rights & UA_ACCESSLEVEL_CURRENTWRITE) != 0) {
TRACE_WARNING(TRACE_FAC_SUBSCRIPTION, "%s: Percent deadband is not supported for writable EURange, requires custom implementation\n", __func__);
}
ua_monitoreditem_set_eurange(item, low, high);
}
/* allocate the initial value */
dv = IPC_ALLOC(dv);
if (dv == NULL)
/* read the initial value */
ua_variable_get_store_index(node, &storeidx);
if (storeidx != 0 || ua_monitoreditem_get_attributeid(item) != UA_ATTRIBUTEID_VALUE) {
node,
session,
dv);
} else {
custom_provider_read_value(
node,
session,
dv);
}
/* Check the result of the initial read. If bad the item is not created with few execptions:
* INDEXRANGENODATA: a bigger array may be written to this value later
* NOTREADABLE, USERACCESSDENIED: permissions of node or user may change
*/
status = dv->status;
ipc_free(dv);
return status;
}
if (ua_monitoreditem_get_sampling_interval(item) < g_appconfig.subscription.min_sampling_interval) {
ua_monitoreditem_set_sampling_interval(item, g_appconfig.subscription.min_sampling_interval);
}
/* add a timer to check the attribute periodically */
ret = timer_add(NULL,
&timer_id,
custom_provider_timer_callback,
item,
if (ret != 0) {
if (ret == UA_EBADOUTOFRESOURCE) {
TRACE_ERROR(TRACE_FAC_APPLICATION, "%s: Not enough timers to create monitoreditem\n", __func__);
}
ipc_free(dv);
}
ua_monitoreditem_set_user_data(item, 0, timer_id);
/* add the initial value */
ret = ua_monitoreditem_new_value(item, dv);
if (ret != 0) {
TRACE_ERROR(TRACE_FAC_APPLICATION, "%s: ua_monitoreditem_new_value() failed with errorcode=%i\n", __func__, ret);
IPC_FREE(dv);
}
/* set operation result, this can be done in subscribe, but we already know it will succeed */
return 0;
}

After the SDK has called the add item function for all monitored items it will call the subscribe function. The provider may start a mass operation to register all monitored items at the device. When the operation has finished uaserver_subscribe_complete must be called for the SDK to continue processing the monitored item. The uaserver_subscribe_complete does not need to be called in the subscribe callback, it may also be called later after an asynchronous operation has finished.

void custom_provider_subscribe(struct uaprovider_subscribe_ctx *ctx) {
/* the item is added/removed in the sync functions completely, so nothing to do here */
}

The monitored item has a field for the operation result. In the add item function it was already set to good, but it is evaluated only after the uaserver_subscribe_complete call. So it may also be set later to propagate an error during the (asynchronous) operation on the device to the SDK.

Sampling the Value

The value of the monitored item is sampled by the timer callback function. The ua_monitoreditem struct is passed as the data parameter, which needs to be cast to the monitored item. It is then used to get the node handle and session to read the value, which is given to the monitored item. The ua_monitoreditem_new_value function takes care of detecting a data change, applying filters and adding the value to the queue:

/* callback for timer if sampling interval of monitored item is due */
static int custom_provider_timer_callback(uint64_t elapsed, void *data)
{
struct ua_monitoreditem *item = data;
struct uasession_session *session;
struct ua_datavalue *dv;
uint8_t storeidx = 0;
int ret;
UA_UNUSED(elapsed);
/* allocate the value */
dv = IPC_ALLOC(dv);
if (dv == NULL)
return 0;
/* read the value */
if (node == UA_NODE_INVALID) {
} else {
if (session == NULL) {
} else {
ua_variable_get_store_index(node, &storeidx);
if (storeidx != 0 || ua_monitoreditem_get_attributeid(item) != UA_ATTRIBUTEID_VALUE) {
node,
session,
dv);
} else {
custom_provider_read_value(
node,
session,
dv);
}
}
}
/* add the value to the monitored item */
ret = ua_monitoreditem_new_value(item, dv);
if (ret != 0) {
TRACE_ERROR(TRACE_FAC_APPLICATION, "%s: ua_monitoreditem_new_value() failed with errorcode=%i\n", __func__, ret);
IPC_FREE(dv);
}
return 0;
}

Modify Item

When a client requests to modify monitored items the SDK calls the modify item handler function for each item to be modified. The SDK handles the change of the queue size and sets the new filter.

The provider needs to check the new filter to handle a possible deadband, these steps are identical to the add item operation. The new filters (deadband, datachange trigger and timestamps to return) are set at the monitored item before the modify function is called, so if your device supports any of these filters they can be updated in the modify operation. If the modify function returns a bad statuscode, these filters are reset by the SDK.

Furthermore the provider needs to applythe new sampling interval. It is passed along the monitored item as parameter to the modify item function, so the function checks whether it changed and creates a new timer with the new sampling interval and set the new interval at the monitored item. If your device does not support the requested sampling interval this function may abort and return a bad statuscode or change the sampling interval to a nearby interval. The actually used sampling interval must always be set at the monitored item and will be returned to the client as revised sampling interval. This implementation needs to remove the old timer and write the new timer id into the user data. The operation succeeded, so the operation result is set to good and the function can return:

ua_statuscode custom_provider_modify_item(struct ua_monitoreditem *item, uint32_t max_items, uint32_t new_sampling_interval)
{
ua_node_t node;
int ret;
UA_UNUSED(max_items);
/* deadband is only allowed if the datatype is a subtype of number */
bool is_numeric = false;
ret = ua_monitoreditem_node_is_numeric(node, &is_numeric);
if (ret != 0 || !is_numeric) {
}
}
/* for deadband percent the EURange must be set */
uint32_t access_rights;
double high;
double low;
ret = ua_monitoreditem_node_read_eurange(node, &low, &high, &access_rights);
if (ret != 0) {
TRACE_WARNING(TRACE_FAC_SUBSCRIPTION, "%s: could not read EURange of request node, errorcode=%i\n", __func__, ret);
}
if ((access_rights & UA_ACCESSLEVEL_CURRENTWRITE) != 0) {
TRACE_WARNING(TRACE_FAC_SUBSCRIPTION, "%s: Percent deadband is not supported for writable EURange, requires custom implementation\n", __func__);
}
ua_monitoreditem_set_eurange(item, low, high);
}
/* update sampling interval if changed */
if (new_sampling_interval != ua_monitoreditem_get_sampling_interval(item)) {
uint32_t timer_id;
if (new_sampling_interval < g_appconfig.subscription.min_sampling_interval) {
new_sampling_interval = g_appconfig.subscription.min_sampling_interval;
}
/* add new timer */
ret = timer_add(NULL,
&timer_id,
new_sampling_interval,
custom_provider_timer_callback,
item,
if (ret == UA_EBADOUTOFRESOURCE) {
TRACE_ERROR(TRACE_FAC_ALL, "%s: Not enough timers to modify monitoreditem\n", __func__);
} else if (ret != 0) {
}
/* update sampling interval in the item */
ua_monitoreditem_set_sampling_interval(item, new_sampling_interval);
/* remove old timer */
/* store id of new timer in item */
ua_monitoreditem_set_user_data(item, 0, timer_id);
}
/* set operation result, this can be done in subscribe, but we already know it will succeed */
return 0;
}

As in the add operation the SDK will call the provider's subscribe function and the provider needs to call uaserver_subscribe_complete. This would allow an asynchronous implementation of the modify operation. If the modify operation fails, the item must be left in the same state as before, it especially must not stop sampling.

Set Item Mode

The set_item_mode callback is called when the client uses the SetMonitoringMode Service to change the monitoring mode of the item. When the mode is change to disabled the provider may stop sampling to save resources and continue when it is changed back to sampling or reporting. The SDK does also track the current monitoring mode and discard values for disabled items automatically, so it is not required to have a fully functional implementation, a dummy implementation like in this example is sufficient:

ua_statuscode custom_provider_set_item_mode(struct ua_monitoreditem *item, uint32_t max_items, enum ua_monitoringmode old_mode, enum ua_monitoringmode new_mode)
{
UA_UNUSED(old_mode);
UA_UNUSED(new_mode);
UA_UNUSED(max_items);
/* set operation result, this can be done in subscribe, but we already know it will succeed */
return 0;
}

Remove Item

The remove item handler is called for each monitored item to be deleted, either by request from the client or if the associated subscription is deleted. The sample implementation only needs to delete the monitored item's timer and set the operation result to good:

ua_statuscode custom_provider_remove_item(struct ua_monitoreditem *item, uint32_t max_items)
{
int ret;
ret = timer_remove(NULL, ua_monitoreditem_get_user_data(item, 0), NULL);
if (ret != 0) return UA_SCBADINTERNALERROR;
UA_UNUSED(max_items);
/* set operation result, this can be done in subscribe, but we already know it will succeed */
return 0;
}

Again the subscribe function is called to give the provider the opportunity to perform the remove as mass and/or asynchronous operation.

Subscription Constraints

The previous sections give an example how the monitored items of a subscriptions are added, modified or removed. However there are some constraints and condidtions in the monitored item mechanism, that should be kept in mind when implementing an own provider:

  • The operation result is set to a bad statuscode before calling a handler, so it must be set to good for all operations if the operation succeeds
  • If the modify handler fails, the monitored item must not change and keep sampling
  • If the remove handler fails, the SDK will still delete the monitored item, so the provider must not access it again after the operation is finished
  • Only one operation is possible at the same time for a monitored item, e.g. a monitored item will not be removed while a modify operation is in progress.
  • None of the provider's handler functions must block. Long running operations should be issued asynchronous and use the subscribe mechanism to return the result.

Step 5: Run Application

Compile and run the server application. When connecting to the server with a UA Client (e.g. UaExpert) the newly created nodes are visible in the server’s address space. It is also possible to read and subscribe the values and write to the variables.