OPC UA PubSub has different transport profiles (message mapping and transport protocol mapping). Most of them do not need custom handling and the default SDK implementation can be used. Such an example is UADP over MQTT or UADP over UDP multi-cast.
If transport profile like UADP over Ethernet has stricter timing requirements, the communication needs typically higher priority and optimized processing. Such requirements can be fulfilled by doing the following custom implementations:
These three custom handlers are typically implemented together.
For the custom data integration and custom timing, the SDK provides a callback interface where the application is called for every PubSub object during the start-up of the PubSub configuration. For each DataSetWriter, DataSetReader and WriterGroup, the application can decide if it wants to handle the processing instead of the default processing provided by the SDK. The pubsub_module_set_callback function is used to register the callback interface.
The custom network integration can be provided by implementing a custom network backend. Such a custom network backend can be registered with pubsub_module_set_network_backend.
The following code provides a simple example for implementing the message processing callbacks. The sample code creates PubSub test variables in the demo namespace and handles sending and receiving of messages containing these variables in the message processing callbacks.
#include "pubsub_application_callback.h"
#include "uapubsub_config.h"
#include <common/errors.h>
#include <uabase/statuscodes.h>
#include <uabase/identifiers.h>
#include <uabase/accesslevel.h>
#include <uabase/valuerank.h>
#include <uabase/structure/datasetfieldcontentmask.h>
#include <uabase/structure/fieldmetadata.h>
#include <uabase/structure/publisheddataitemsdatatype.h>
#include <uabase/structure/targetvariablesdatatype.h>
#include <uaserver/addressspace/variable.h>
#include <uaserver/addressspace/object.h>
#include <uaserver/valuestore/uint32store.h>
#include <uaencoder/boolean.h>
#include <uaencoder/int8.h>
#include <uaencoder/int16.h>
#include <uaencoder/int32.h>
#include <uaencoder/int64.h>
#include <uaencoder/uint8.h>
#include <uaencoder/uint16.h>
#include <uaencoder/uint32.h>
#include <uaencoder/uint64.h>
#include <uaencoder/float.h>
#include <uaencoder/double.h>
#define USE_PUBSUB_MODULE_DO_COM 1
struct pubsub_application_writer_group {
bool is_initialized;
pubsub_handle handle_writer_group;
double publishing_interval;
};
struct pubsub_application_dataset_writer {
bool is_initialized;
bool is_raw_encoding;
uint16_t field_count;
uint16_t *field_data_index_array;
};
void pubsub_application_dataset_writer_init(struct pubsub_application_dataset_writer *writer)
{
writer->is_initialized = false;
writer->is_raw_encoding = false;
writer->field_count = 0;
writer->fields = NULL;
writer->field_data_index_array = NULL;
}
struct pubsub_application_dataset_reader {
bool is_initialized;
bool is_raw_encoding;
uint16_t field_count;
int32_t *field_data_index_array;
};
void pubsub_application_dataset_reader_init(struct pubsub_application_dataset_reader *reader)
{
reader->is_initialized = false;
reader->is_raw_encoding = false;
reader->field_count = 0;
reader->fields = NULL;
reader->field_data_index_array = NULL;
}
struct pubsub_application_dataset_reader g_pubsub_application_dataset_readers[PUBSUB_NUM_READERS_MAX];
struct pubsub_application_dataset_writer g_pubsub_application_dataset_writers[PUBSUB_NUM_WRITERS_MAX];
static uint32_t g_values[20];
static bool pubsub_application_writer_group_handler(pubsub_handle writer_group);
static bool pubsub_application_dataset_writer_handler(
pubsub_handle dataset_writer,
void **user_data);
static bool pubsub_application_dataset_reader_handler(
pubsub_handle dataset_reader,
void **user_data);
pubsub_application_dataset_writer_handler,
pubsub_application_dataset_reader_handler };
{
return &g_pubsub_module_callback;
}
void pubsub_application_init(void)
{
int i = 0;
int ret;
uint16_t nsidx = 2;
for (i = 0; i < PUBSUB_NUM_WRITERS_MAX; i++) {
pubsub_application_dataset_writer_init(&g_pubsub_application_dataset_writers[i]);
}
ret = ua_uint32store_init(&g_store, 0, g_values,
countof(g_values));
if (ret != 0) return;
if (ret != 0) return;
ret = ua_uint32store_register_node(&g_store, 0, node);
if (ret != 0) return;
if (ret != 0) return;
ret = ua_uint32store_register_node(&g_store, 10, node);
if (ret != 0) return;
if (ret != 0) return;
ret = ua_uint32store_register_node(&g_store, 20, node);
if (ret != 0) return;
if (ret != 0) return;
ret = ua_uint32store_register_node(&g_store, 30, node);
if (ret != 0) return;
if (ret != 0) return;
ret = ua_uint32store_register_node(&g_store, 40, node);
if (ret != 0) return;
if (ret != 0) return;
ret = ua_uint32store_register_node(&g_store, 50, node);
if (ret != 0) return;
if (ret != 0) return;
ret = ua_uint32store_register_node(&g_store, 60, node);
if (ret != 0) return;
if (ret != 0) return;
ret = ua_uint32store_register_node(&g_store, 70, node);
if (ret != 0) return;
if (ret != 0) return;
ret = ua_uint32store_register_node(&g_store, 80, node);
if (ret != 0) return;
if (ret != 0) return;
ret = ua_uint32store_register_node(&g_store, 90, node);
if (ret != 0) return;
if (ret != 0) return;
ret = ua_uint32store_register_node(&g_store, 0, node);
if (ret != 0) return;
if (ret != 0) return;
ret = ua_uint32store_register_node(&g_store, 0, node);
if (ret != 0) return;
if (ret != 0) return;
ret = ua_uint32store_register_node(&g_store, 0, node);
if (ret != 0) return;
if (ret != 0) return;
ret = ua_uint32store_register_node(&g_store, 0, node);
if (ret != 0) return;
if (ret != 0) return;
ret = ua_uint32store_register_node(&g_store, 0, node);
if (ret != 0) return;
if (ret != 0) return;
ret = ua_uint32store_register_node(&g_store, 0, node);
if (ret != 0) return;
if (ret != 0) return;
ret = ua_uint32store_register_node(&g_store, 0, node);
if (ret != 0) return;
if (ret != 0) return;
ret = ua_uint32store_register_node(&g_store, 0, node);
if (ret != 0) return;
if (ret != 0) return;
ret = ua_uint32store_register_node(&g_store, 0, node);
if (ret != 0) return;
if (ret != 0) return;
ret = ua_uint32store_register_node(&g_store, 0, node);
if (ret != 0) return;
}
void pubsub_application_cleanup(void)
{
ua_uint32store_clear(&g_store);
}
static int pubsub_application_datasetwriter_sample_cb(
void *user_data,
bool *dataSetMessageValid,
uint8_t *dataSetMessageType,
uint16_t *dataSetStatus)
{
struct pubsub_application_dataset_writer *writer_mgt = (struct pubsub_application_dataset_writer *)user_data;
uint16_t i = 0;
int ret = 0;
if (writer_mgt->is_raw_encoding == false) {
ret = ua_encode_uint16(enc, &writer_mgt->field_count);
}
for (i = 0; i < writer_mgt->field_count; i++) {
if (writer_mgt->is_raw_encoding == false) {
ret = ua_encode_uint8(enc, &writer_mgt->fields[i].built_in_type);
if (ret != 0) break;
}
uint32_t tempValue = g_values[writer_mgt->field_data_index_array[i]];
ret = ua_encode_uint32(enc, &tempValue);
if (ret != 0) break;
}
if (ret == 0) {
*dataSetMessageValid = true;
*dataSetStatus = 0;
} else {
*dataSetMessageValid = false;
}
return 0;
}
static int pubsub_application_datasetreader_dispatch_cb(
void *user_data,
uint8_t dataSetMessageType,
uint16_t dataSetStatus,
{
struct pubsub_application_dataset_reader *reader_mgt = (struct pubsub_application_dataset_reader *)user_data;
uint16_t i = 0;
int ret = 0;
if (reader_mgt->is_raw_encoding == false) {
uint16_t temp_count;
ret = ua_decode_uint16(dec, &temp_count);
}
for (i = 0; i < reader_mgt->field_count; i++) {
uint8_t built_in_type = reader_mgt->fields[i].built_in_type;
if (reader_mgt->is_raw_encoding == false) {
ret = ua_decode_uint8(dec, &built_in_type);
if (ret != 0) break;
}
if (built_in_type != UA_VT_UINT32) {
break;
}
uint32_t tempValue;
ret = ua_decode_uint32(dec, &tempValue);
if (reader_mgt->field_data_index_array[i] >= 0) {
g_values[reader_mgt->field_data_index_array[i]] = tempValue;
}
if (ret != 0) break;
}
return 0;
}
static bool pubsub_application_dataset_writer_handler(
pubsub_handle dataset_writer,
void **user_data)
{
int32_t i = 0;
int ret = 0;
struct pubsub_application_dataset_writer *writer_manager = NULL;
uint16_t *field_data_index_array = NULL;
published_dataitems_data = published_dataset_data->data_set_source.
body.obj;
}
for (i = 0; i < PUBSUB_NUM_WRITERS_MAX; i++) {
if (g_pubsub_application_dataset_writers[i].is_initialized == false) {
writer_manager = &g_pubsub_application_dataset_writers[i];
break;
}
}
if (writer_manager == NULL || dataset_writer_data == NULL || published_dataset_data == NULL ||
published_dataitems_data == NULL) {
return false;
}
if (published_dataitems_data->num_published_data != published_dataset_data->data_set_meta_data.num_fields) {
return false;
}
field_data_index_array = malloc(sizeof(uint16_t) * published_dataset_data->data_set_meta_data.num_fields);
for (i = 0; i < published_dataset_data->data_set_meta_data.num_fields; i++) {
if (published_dataset_data->data_set_meta_data.fields[i].built_in_type != UA_VT_UINT32) {
ret = UA_EBADINVALIDARGUMENT;
}
if (published_dataset_data->data_set_meta_data.fields[i].value_rank != -1) {
ret = UA_EBADINVALIDARGUMENT;
}
published_dataitems_data->published_data[i].published_variable.
identifier.
numeric >= 2000 &&
published_dataitems_data->published_data[i].published_variable.
identifier.
numeric < 2010) {
field_data_index_array[i] =
(uint16_t)published_dataitems_data->published_data[i].published_variable.
identifier.
numeric - 2000;
} else {
ret = UA_EBADINVALIDARGUMENT;
}
if (ret != 0) break;
}
if (ret == 0) {
writer_manager->is_initialized = true;
if (dataset_writer_data->data_set_field_content_mask & UA_DATASETFIELDCONTENTMASK_RAWDATAENCODING) {
writer_manager->is_raw_encoding = true;
}
writer_manager->field_count = (uint16_t)published_dataset_data->data_set_meta_data.num_fields;
writer_manager->fields = published_dataset_data->data_set_meta_data.fields;
writer_manager->field_data_index_array = field_data_index_array;
*user_data = (void *)writer_manager;
*sample_cb = pubsub_application_datasetwriter_sample_cb;
return true;
} else {
free(field_data_index_array);
return false;
}
}
static bool pubsub_application_dataset_reader_handler(
pubsub_handle dataset_reader,
void **user_data)
{
int32_t i, j = 0;
int ret = 0;
struct pubsub_application_dataset_reader *reader_manager = NULL;
int32_t *field_data_index_array = NULL;
if (dataset_reader_data && dataset_reader_data->subscribed_data_set.
type_id.
nsindex == 0 &&
target_variables_data = dataset_reader_data->subscribed_data_set.
body.obj;
}
for (i = 0; i < PUBSUB_NUM_WRITERS_MAX; i++) {
if (g_pubsub_application_dataset_readers[i].is_initialized == false) {
reader_manager = &g_pubsub_application_dataset_readers[i];
break;
}
}
if (reader_manager == NULL || dataset_reader_data == NULL || target_variables_data == NULL) {
return false;
}
if (dataset_reader_data->data_set_meta_data.num_fields < target_variables_data->num_target_variables) {
return false;
}
field_data_index_array = malloc(sizeof(int32_t) * dataset_reader_data->data_set_meta_data.num_fields);
for (i = 0; i < dataset_reader_data->data_set_meta_data.num_fields; i++) {
if (dataset_reader_data->data_set_meta_data.fields[i].value_rank != -1) {
ret = UA_EBADINVALIDARGUMENT;
}
field_data_index_array[i] = -1;
for (j = 0; j < target_variables_data->num_target_variables; j++) {
if (0 == ua_guid_compare(
&dataset_reader_data->data_set_meta_data.fields[i].data_set_field_id,
&target_variables_data->target_variables[j].data_set_field_id)) {
target_variables_data->target_variables[j].target_node_id.
identifier.
numeric >= 2100 &&
target_variables_data->target_variables[j].target_node_id.
identifier.
numeric < 2110) {
field_data_index_array[i] =
(int32_t)target_variables_data->target_variables[j].target_node_id.
identifier.
numeric -
2100 + 10;
} else {
ret = UA_EBADINVALIDARGUMENT;
}
break;
}
}
if (ret != 0) break;
}
if (ret == 0) {
reader_manager->is_initialized = true;
if (dataset_reader_data->data_set_field_content_mask & UA_DATASETFIELDCONTENTMASK_RAWDATAENCODING) {
reader_manager->is_raw_encoding = true;
}
reader_manager->field_count = (uint16_t)dataset_reader_data->data_set_meta_data.num_fields;
reader_manager->fields = dataset_reader_data->data_set_meta_data.fields;
reader_manager->field_data_index_array = field_data_index_array;
*user_data = (void *)reader_manager;
*dispatch_cb = pubsub_application_datasetreader_dispatch_cb;
return true;
} else {
free(field_data_index_array);
return false;
}
}
static bool pubsub_application_writer_group_handler(pubsub_handle writer_group)
{
return false;
}
void pubsub_application_simulate_data(void)
{
int i = 0;
int count = 10;
for (i = 0; i < count; i++) {
g_values[i]++;
}
}