High Performance OPC UA Server SDK  1.7.1.383
Custom PubSub Message Handling

The PubSub Stack and SDK integration allows custom PubSub message handling. Such custom message handling is mainly necessary for real-time communication parts. Since the OPC UA server is typically running with lower priorities than real-time components, also the PubSub communication is executed in the same priority.

OPC UA PubSub has different transport profiles (message mapping and transport protocol mapping). Most of them do not need custom handling and the default SDK implementation can be used. Such an example is UADP over MQTT or UADP over UDP multi-cast.

If transport profile like UADP over Ethernet has stricter timing requirements, the communication needs typically higher priority and optimized processing. Such requirements can be fulfilled by doing the following custom implementations:

  • Custom data integration to encode the data to be sent by a DataSetWriter directly from the data source into the network message or to decode the data received by a DataSetReader directly from the network message to the data target.
  • Custom timing of WriterGroup for sampling and publishing of network message at the configured offsets
  • Custom network integration for optimized buffer management, prioritized sending and receiving of network messages

These three custom handlers are typically implemented together.

For the custom data integration and custom timing, the SDK provides a callback interface where the application is called for every PubSub object during the start-up of the PubSub configuration. For each DataSetWriter, DataSetReader and WriterGroup, the application can decide if it wants to handle the processing instead of the default processing provided by the SDK. The pubsub_module_set_callback function is used to register the callback interface.

The custom network integration can be provided by implementing a custom network backend. Such a custom network backend can be registered with pubsub_module_set_network_backend.

The following code provides a simple example for implementing the message processing callbacks. The sample code creates PubSub test variables in the demo namespace and handles sending and receiving of messages containing these variables in the message processing callbacks.

pubsub_application_callback.h

#ifndef PUBSUB_APPLICATION_CALLBACK_H
#define PUBSUB_APPLICATION_CALLBACK_H
#include "pubsub_module.h"
#include <uaserver/init.h>
struct pubsub_module_callback* pubsub_application_callback_get(void);
void pubsub_application_init(void);
void pubsub_application_cleanup(void);
void pubsub_application_simulate_data(void);
#endif /* end of include guard: PUBSUB_APPLICATION_CALLBACK_H */

pubsub_application_callback.c

#include "pubsub_application_callback.h"
#include "uapubsub_config.h"
#include <common/errors.h>
#include <uabase/statuscodes.h>
#include <uabase/identifiers.h>
#include <uabase/accesslevel.h>
#include <uabase/valuerank.h>
#include <uabase/structure/datasetfieldcontentmask.h>
#include <uabase/structure/fieldmetadata.h>
#include <uabase/structure/publisheddataitemsdatatype.h>
#include <uabase/structure/targetvariablesdatatype.h>
#include <uaserver/addressspace/variable.h>
#include <uaserver/addressspace/object.h>
#include <uaserver/valuestore/uint32store.h>
#include <uaencoder/boolean.h>
#include <uaencoder/int8.h>
#include <uaencoder/int16.h>
#include <uaencoder/int32.h>
#include <uaencoder/int64.h>
#include <uaencoder/uint8.h>
#include <uaencoder/uint16.h>
#include <uaencoder/uint32.h>
#include <uaencoder/uint64.h>
#include <uaencoder/float.h>
#include <uaencoder/double.h>
#define USE_PUBSUB_MODULE_DO_COM 1
struct pubsub_application_writer_group {
bool is_initialized;
pubsub_handle handle_writer_group;
double publishing_interval;
};
struct pubsub_application_dataset_writer {
bool is_initialized;
bool is_raw_encoding;
uint16_t field_count;
struct ua_fieldmetadata *fields;
uint16_t *field_data_index_array;
};
void pubsub_application_dataset_writer_init(struct pubsub_application_dataset_writer *writer)
{
writer->is_initialized = false;
writer->is_raw_encoding = false;
writer->field_count = 0;
writer->fields = NULL;
writer->field_data_index_array = NULL;
}
struct pubsub_application_dataset_reader {
bool is_initialized;
bool is_raw_encoding;
uint16_t field_count;
struct ua_fieldmetadata *fields;
int32_t *field_data_index_array;
};
void pubsub_application_dataset_reader_init(struct pubsub_application_dataset_reader *reader)
{
reader->is_initialized = false;
reader->is_raw_encoding = false;
reader->field_count = 0;
reader->fields = NULL;
reader->field_data_index_array = NULL;
}
struct pubsub_application_dataset_reader g_pubsub_application_dataset_readers[PUBSUB_NUM_READERS_MAX];
struct pubsub_application_dataset_writer g_pubsub_application_dataset_writers[PUBSUB_NUM_WRITERS_MAX];
/* Value store used for UInt32 variables, 10 for publishing and 10 for subscribing*/
static uint32_t g_values[20];
static struct ua_uint32store g_store;
static bool pubsub_application_writer_group_handler(pubsub_handle writer_group);
static bool pubsub_application_dataset_writer_handler(
pubsub_handle dataset_writer,
void **user_data);
static bool pubsub_application_dataset_reader_handler(
pubsub_handle dataset_reader,
void **user_data);
struct pubsub_module_callback g_pubsub_module_callback = { pubsub_application_writer_group_handler,
pubsub_application_dataset_writer_handler,
pubsub_application_dataset_reader_handler };
struct pubsub_module_callback *pubsub_application_callback_get(void)
{
return &g_pubsub_module_callback;
}
void pubsub_application_init(void)
{
int i = 0;
int ret;
ua_node_t node, folder;
struct ua_nodeid id;
struct ua_nodeid datatypeid;
uint16_t nsidx = 2;
for (i = 0; i < PUBSUB_NUM_WRITERS_MAX; i++) {
pubsub_application_dataset_writer_init(&g_pubsub_application_dataset_writers[i]);
}
/* register the in-memory store */
ret = ua_uint32store_init(&g_store, 0, g_values, countof(g_values));
if (ret != 0) return;
ua_nodeid_set_numeric(&datatypeid, 0, UA_ID_UINT32);
/* Create folder for Published Variables */
ua_nodeid_set_numeric(&id, nsidx, 1001);
folder = ua_object_create_folder(&id, UA_NODE_OBJECTSFOLDER, "Published");
if (folder == UA_NODE_INVALID) return;
/* Variable1 */
ua_nodeid_set_numeric(&id, nsidx, 2000);
&id, UA_NODECLASS_VARIABLE, id.nsindex, "Variable1", NULL, UA_NODE_INVALID, folder, UA_NODE_ORGANIZES);
if (node == UA_NODE_INVALID) return;
if (ret != 0) return;
/* set value */
ret = ua_uint32store_register_node(&g_store, 0, node);
if (ret != 0) return;
/* Variable2 */
ua_nodeid_set_numeric(&id, nsidx, 2001);
&id, UA_NODECLASS_VARIABLE, id.nsindex, "Variable2", NULL, UA_NODE_INVALID, folder, UA_NODE_ORGANIZES);
if (node == UA_NODE_INVALID) return;
if (ret != 0) return;
/* set value */
ret = ua_uint32store_register_node(&g_store, 10, node);
if (ret != 0) return;
/* Variable3 */
ua_nodeid_set_numeric(&id, nsidx, 2002);
&id, UA_NODECLASS_VARIABLE, id.nsindex, "Variable3", NULL, UA_NODE_INVALID, folder, UA_NODE_ORGANIZES);
if (node == UA_NODE_INVALID) return;
if (ret != 0) return;
/* set value */
ret = ua_uint32store_register_node(&g_store, 20, node);
if (ret != 0) return;
/* Variable4 */
ua_nodeid_set_numeric(&id, nsidx, 2003);
&id, UA_NODECLASS_VARIABLE, id.nsindex, "Variable4", NULL, UA_NODE_INVALID, folder, UA_NODE_ORGANIZES);
if (node == UA_NODE_INVALID) return;
if (ret != 0) return;
/* set value */
ret = ua_uint32store_register_node(&g_store, 30, node);
if (ret != 0) return;
/* Variable5 */
ua_nodeid_set_numeric(&id, nsidx, 2004);
&id, UA_NODECLASS_VARIABLE, id.nsindex, "Variable5", NULL, UA_NODE_INVALID, folder, UA_NODE_ORGANIZES);
if (node == UA_NODE_INVALID) return;
if (ret != 0) return;
/* set value */
ret = ua_uint32store_register_node(&g_store, 40, node);
if (ret != 0) return;
/* Variable6 */
ua_nodeid_set_numeric(&id, nsidx, 2005);
&id, UA_NODECLASS_VARIABLE, id.nsindex, "Variable6", NULL, UA_NODE_INVALID, folder, UA_NODE_ORGANIZES);
if (node == UA_NODE_INVALID) return;
if (ret != 0) return;
/* set value */
ret = ua_uint32store_register_node(&g_store, 50, node);
if (ret != 0) return;
/* Variable7 */
ua_nodeid_set_numeric(&id, nsidx, 2006);
&id, UA_NODECLASS_VARIABLE, id.nsindex, "Variable7", NULL, UA_NODE_INVALID, folder, UA_NODE_ORGANIZES);
if (node == UA_NODE_INVALID) return;
if (ret != 0) return;
/* set value */
ret = ua_uint32store_register_node(&g_store, 60, node);
if (ret != 0) return;
/* Variable8 */
ua_nodeid_set_numeric(&id, nsidx, 2007);
&id, UA_NODECLASS_VARIABLE, id.nsindex, "Variable8", NULL, UA_NODE_INVALID, folder, UA_NODE_ORGANIZES);
if (node == UA_NODE_INVALID) return;
if (ret != 0) return;
/* set value */
ret = ua_uint32store_register_node(&g_store, 70, node);
if (ret != 0) return;
/* Variable9 */
ua_nodeid_set_numeric(&id, nsidx, 2008);
&id, UA_NODECLASS_VARIABLE, id.nsindex, "Variable9", NULL, UA_NODE_INVALID, folder, UA_NODE_ORGANIZES);
if (node == UA_NODE_INVALID) return;
if (ret != 0) return;
/* set value */
ret = ua_uint32store_register_node(&g_store, 80, node);
if (ret != 0) return;
/* Variable10 */
ua_nodeid_set_numeric(&id, nsidx, 2009);
&id, UA_NODECLASS_VARIABLE, id.nsindex, "Variable10", NULL, UA_NODE_INVALID, folder, UA_NODE_ORGANIZES);
if (node == UA_NODE_INVALID) return;
if (ret != 0) return;
/* set value */
ret = ua_uint32store_register_node(&g_store, 90, node);
if (ret != 0) return;
/* Create folder for Subscribed Variables */
ua_nodeid_set_numeric(&id, nsidx, 1002);
folder = ua_object_create_folder(&id, UA_NODE_OBJECTSFOLDER, "Subscribed");
if (folder == UA_NODE_INVALID) return;
/* Variable1 */
ua_nodeid_set_numeric(&id, nsidx, 2100);
&id, UA_NODECLASS_VARIABLE, id.nsindex, "Variable1", NULL, UA_NODE_INVALID, folder, UA_NODE_ORGANIZES);
if (node == UA_NODE_INVALID) return;
if (ret != 0) return;
/* set value */
ret = ua_uint32store_register_node(&g_store, 0, node);
if (ret != 0) return;
/* Variable2 */
ua_nodeid_set_numeric(&id, nsidx, 2101);
&id, UA_NODECLASS_VARIABLE, id.nsindex, "Variable2", NULL, UA_NODE_INVALID, folder, UA_NODE_ORGANIZES);
if (node == UA_NODE_INVALID) return;
if (ret != 0) return;
/* set value */
ret = ua_uint32store_register_node(&g_store, 0, node);
if (ret != 0) return;
/* Variable3 */
ua_nodeid_set_numeric(&id, nsidx, 2102);
&id, UA_NODECLASS_VARIABLE, id.nsindex, "Variable3", NULL, UA_NODE_INVALID, folder, UA_NODE_ORGANIZES);
if (node == UA_NODE_INVALID) return;
if (ret != 0) return;
/* set value */
ret = ua_uint32store_register_node(&g_store, 0, node);
if (ret != 0) return;
/* Variable4 */
ua_nodeid_set_numeric(&id, nsidx, 2103);
&id, UA_NODECLASS_VARIABLE, id.nsindex, "Variable4", NULL, UA_NODE_INVALID, folder, UA_NODE_ORGANIZES);
if (node == UA_NODE_INVALID) return;
if (ret != 0) return;
/* set value */
ret = ua_uint32store_register_node(&g_store, 0, node);
if (ret != 0) return;
/* Variable5 */
ua_nodeid_set_numeric(&id, nsidx, 2104);
&id, UA_NODECLASS_VARIABLE, id.nsindex, "Variable5", NULL, UA_NODE_INVALID, folder, UA_NODE_ORGANIZES);
if (node == UA_NODE_INVALID) return;
if (ret != 0) return;
/* set value */
ret = ua_uint32store_register_node(&g_store, 0, node);
if (ret != 0) return;
/* Variable6 */
ua_nodeid_set_numeric(&id, nsidx, 2105);
&id, UA_NODECLASS_VARIABLE, id.nsindex, "Variable6", NULL, UA_NODE_INVALID, folder, UA_NODE_ORGANIZES);
if (node == UA_NODE_INVALID) return;
if (ret != 0) return;
/* set value */
ret = ua_uint32store_register_node(&g_store, 0, node);
if (ret != 0) return;
/* Variable7 */
ua_nodeid_set_numeric(&id, nsidx, 2106);
&id, UA_NODECLASS_VARIABLE, id.nsindex, "Variable7", NULL, UA_NODE_INVALID, folder, UA_NODE_ORGANIZES);
if (node == UA_NODE_INVALID) return;
if (ret != 0) return;
/* set value */
ret = ua_uint32store_register_node(&g_store, 0, node);
if (ret != 0) return;
/* Variable8 */
ua_nodeid_set_numeric(&id, nsidx, 2107);
&id, UA_NODECLASS_VARIABLE, id.nsindex, "Variable8", NULL, UA_NODE_INVALID, folder, UA_NODE_ORGANIZES);
if (node == UA_NODE_INVALID) return;
if (ret != 0) return;
/* set value */
ret = ua_uint32store_register_node(&g_store, 0, node);
if (ret != 0) return;
/* Variable9 */
ua_nodeid_set_numeric(&id, nsidx, 2108);
&id, UA_NODECLASS_VARIABLE, id.nsindex, "Variable9", NULL, UA_NODE_INVALID, folder, UA_NODE_ORGANIZES);
if (node == UA_NODE_INVALID) return;
if (ret != 0) return;
/* set value */
ret = ua_uint32store_register_node(&g_store, 0, node);
if (ret != 0) return;
/* Variable10 */
ua_nodeid_set_numeric(&id, nsidx, 2109);
&id, UA_NODECLASS_VARIABLE, id.nsindex, "Variable10", NULL, UA_NODE_INVALID, folder, UA_NODE_ORGANIZES);
if (node == UA_NODE_INVALID) return;
if (ret != 0) return;
/* set value */
ret = ua_uint32store_register_node(&g_store, 0, node);
if (ret != 0) return;
}
void pubsub_application_cleanup(void)
{
ua_uint32store_clear(&g_store);
}
static int pubsub_application_datasetwriter_sample_cb(
struct ua_encoder_context *enc,
void *user_data,
bool *dataSetMessageValid,
uint8_t *dataSetMessageType,
uint16_t *dataSetStatus)
{
UA_UNUSED(dataSetMessageType);
struct pubsub_application_dataset_writer *writer_mgt = (struct pubsub_application_dataset_writer *)user_data;
uint16_t i = 0;
int ret = 0;
if (writer_mgt->is_raw_encoding == false) {
ret = ua_encode_uint16(enc, &writer_mgt->field_count);
}
for (i = 0; i < writer_mgt->field_count; i++) {
if (writer_mgt->is_raw_encoding == false) {
/* If we do not have raw encoding, we need to encode the variant type first*/
ret = ua_encode_uint8(enc, &writer_mgt->fields[i].built_in_type);
if (ret != 0) break;
}
/* Get the current value out of the value array by using the index stored for the field */
uint32_t tempValue = g_values[writer_mgt->field_data_index_array[i]];
/* Encode the value into the message */
ret = ua_encode_uint32(enc, &tempValue);
if (ret != 0) break;
}
if (ret == 0) {
*dataSetMessageValid = true;
*dataSetStatus = 0;
} else {
*dataSetMessageValid = false;
}
return 0;
}
static int pubsub_application_datasetreader_dispatch_cb(
struct ua_decoder_context *dec,
void *user_data,
uint8_t dataSetMessageType,
uint16_t dataSetStatus,
ua_datetime timestamp)
{
UA_UNUSED(dataSetMessageType);
UA_UNUSED(dataSetStatus);
UA_UNUSED(timestamp);
struct pubsub_application_dataset_reader *reader_mgt = (struct pubsub_application_dataset_reader *)user_data;
uint16_t i = 0;
int ret = 0;
if (reader_mgt->is_raw_encoding == false) {
uint16_t temp_count;
ret = ua_decode_uint16(dec, &temp_count);
}
for (i = 0; i < reader_mgt->field_count; i++) {
uint8_t built_in_type = reader_mgt->fields[i].built_in_type;
if (reader_mgt->is_raw_encoding == false) {
/* If we do not have raw encoding, we need to decode the variant type first*/
ret = ua_decode_uint8(dec, &built_in_type);
if (ret != 0) break;
}
if (built_in_type != UA_VT_UINT32) {
/* ToDo */
/* Skip fields that have an index of -1 */
/* ToDo */
break;
}
/* Get the current value out of the value array by using the index stored for the field */
uint32_t tempValue;
/* Encode the value into the message */
ret = ua_decode_uint32(dec, &tempValue);
if (reader_mgt->field_data_index_array[i] >= 0) {
/* Assign the new value to the variable value array entry */
g_values[reader_mgt->field_data_index_array[i]] = tempValue;
}
if (ret != 0) break;
}
return 0;
}
static bool pubsub_application_dataset_writer_handler(
pubsub_handle dataset_writer,
void **user_data)
{
int32_t i = 0;
int ret = 0;
const struct ua_datasetwriterdatatype *dataset_writer_data = pubsub_module_get_dataset_writer(dataset_writer);
const struct ua_publisheddatasetdatatype *published_dataset_data =
const struct ua_publisheddataitemsdatatype *published_dataitems_data = NULL;
struct pubsub_application_dataset_writer *writer_manager = NULL;
uint16_t *field_data_index_array = NULL;
if (pubsub_module_get_published_dataset && published_dataset_data->data_set_source.type_id.nsindex == 0 &&
published_dataset_data->data_set_source.type_id.type == UA_IDENTIFIER_NUMERIC &&
published_dataset_data->data_set_source.type_id.identifier.numeric == UA_ID_PUBLISHEDDATAITEMSDATATYPE &&
published_dataset_data->data_set_source.encoding == UA_EXTENSIONOBJECT_ENCODEABLEOBJECT) {
/* We have Published Variables as source */
published_dataitems_data = published_dataset_data->data_set_source.body.obj;
}
for (i = 0; i < PUBSUB_NUM_WRITERS_MAX; i++) {
if (g_pubsub_application_dataset_writers[i].is_initialized == false) {
writer_manager = &g_pubsub_application_dataset_writers[i];
break;
}
}
/* Check if we found a writer manager used and if the configuration structures are available */
if (writer_manager == NULL || dataset_writer_data == NULL || published_dataset_data == NULL ||
published_dataitems_data == NULL) {
/* The configuration is not complete */
return false;
}
if (published_dataitems_data->num_published_data != published_dataset_data->data_set_meta_data.num_fields) {
/* The number of published variables does not match the number of fields in the DataSetMetaData */
return false;
}
field_data_index_array = malloc(sizeof(uint16_t) * published_dataset_data->data_set_meta_data.num_fields);
for (i = 0; i < published_dataset_data->data_set_meta_data.num_fields; i++) {
/* Check if we have the expected field data type */
if (published_dataset_data->data_set_meta_data.fields[i].built_in_type != UA_VT_UINT32) {
ret = UA_EBADINVALIDARGUMENT;
}
/* Check if the value rank of the field is skalar */
if (published_dataset_data->data_set_meta_data.fields[i].value_rank != -1) {
ret = UA_EBADINVALIDARGUMENT;
}
/* Check if we have a valid NodeId */
if (published_dataitems_data->published_data[i].published_variable.type == UA_IDENTIFIER_NUMERIC &&
published_dataitems_data->published_data[i].published_variable.identifier.numeric >= 2000 &&
published_dataitems_data->published_data[i].published_variable.identifier.numeric < 2010) {
/* Map the numeric identifier of the NodeId to the index */
field_data_index_array[i] =
(uint16_t)published_dataitems_data->published_data[i].published_variable.identifier.numeric - 2000;
} else {
ret = UA_EBADINVALIDARGUMENT;
}
/* Break processing if we have an error*/
if (ret != 0) break;
}
if (ret == 0) {
/* Initialize internal management structure */
writer_manager->is_initialized = true;
if (dataset_writer_data->data_set_field_content_mask & UA_DATASETFIELDCONTENTMASK_RAWDATAENCODING) {
writer_manager->is_raw_encoding = true;
}
/* Set DataSetMetaData field information in management structure */
writer_manager->field_count = (uint16_t)published_dataset_data->data_set_meta_data.num_fields;
writer_manager->fields = published_dataset_data->data_set_meta_data.fields;
writer_manager->field_data_index_array = field_data_index_array;
/* Provide user data and callback function */
*user_data = (void *)writer_manager;
*sample_cb = pubsub_application_datasetwriter_sample_cb;
return true;
} else {
free(field_data_index_array);
return false;
}
}
static bool pubsub_application_dataset_reader_handler(
pubsub_handle dataset_reader,
void **user_data)
{
int32_t i, j = 0;
int ret = 0;
const struct ua_datasetreaderdatatype *dataset_reader_data = pubsub_module_get_dataset_reader(dataset_reader);
const struct ua_targetvariablesdatatype *target_variables_data = NULL;
struct pubsub_application_dataset_reader *reader_manager = NULL;
int32_t *field_data_index_array = NULL;
if (dataset_reader_data && dataset_reader_data->subscribed_data_set.type_id.nsindex == 0 &&
dataset_reader_data->subscribed_data_set.type_id.type == UA_IDENTIFIER_NUMERIC &&
dataset_reader_data->subscribed_data_set.type_id.identifier.numeric == UA_ID_TARGETVARIABLESDATATYPE &&
dataset_reader_data->subscribed_data_set.encoding == UA_EXTENSIONOBJECT_ENCODEABLEOBJECT) {
/* We have Target Variables as subscribed data set */
target_variables_data = dataset_reader_data->subscribed_data_set.body.obj;
}
for (i = 0; i < PUBSUB_NUM_WRITERS_MAX; i++) {
if (g_pubsub_application_dataset_readers[i].is_initialized == false) {
reader_manager = &g_pubsub_application_dataset_readers[i];
break;
}
}
/* Check if we found a reader manager used and if the configuration structures are available */
if (reader_manager == NULL || dataset_reader_data == NULL || target_variables_data == NULL) {
/* The configuration is not complete */
return false;
}
if (dataset_reader_data->data_set_meta_data.num_fields < target_variables_data->num_target_variables) {
/* The number of target variables is larger than the number of fields in the DataSetMetaData */
return false;
}
field_data_index_array = malloc(sizeof(int32_t) * dataset_reader_data->data_set_meta_data.num_fields);
for (i = 0; i < dataset_reader_data->data_set_meta_data.num_fields; i++) {
/* Check if the value rank of the field is skalar */
if (dataset_reader_data->data_set_meta_data.fields[i].value_rank != -1) {
ret = UA_EBADINVALIDARGUMENT;
}
/* Initialize with invalid index */
field_data_index_array[i] = -1;
/* Try to find a target variable for the data set field */
for (j = 0; j < target_variables_data->num_target_variables; j++) {
if (0 == ua_guid_compare(
&dataset_reader_data->data_set_meta_data.fields[i].data_set_field_id,
&target_variables_data->target_variables[j].data_set_field_id)) {
/* Check if we have a valid NodeId */
if (target_variables_data->target_variables[j].target_node_id.type == UA_IDENTIFIER_NUMERIC &&
target_variables_data->target_variables[j].target_node_id.identifier.numeric >= 2100 &&
target_variables_data->target_variables[j].target_node_id.identifier.numeric < 2110) {
/* Map the numeric identifier of the NodeId to the index in the g_values array (index 10 - 19) */
field_data_index_array[i] =
(int32_t)target_variables_data->target_variables[j].target_node_id.identifier.numeric -
2100 + 10;
} else {
ret = UA_EBADINVALIDARGUMENT;
}
/* We have a match of the field IDs */
break;
}
}
/* Break processing if we have an error*/
if (ret != 0) break;
}
if (ret == 0) {
/* Initialize internal management structure */
reader_manager->is_initialized = true;
if (dataset_reader_data->data_set_field_content_mask & UA_DATASETFIELDCONTENTMASK_RAWDATAENCODING) {
reader_manager->is_raw_encoding = true;
}
/* Set DataSetMetaData field information in management structure */
reader_manager->field_count = (uint16_t)dataset_reader_data->data_set_meta_data.num_fields;
reader_manager->fields = dataset_reader_data->data_set_meta_data.fields;
reader_manager->field_data_index_array = field_data_index_array;
/* Provide user data and callback function */
*user_data = (void *)reader_manager;
*dispatch_cb = pubsub_application_datasetreader_dispatch_cb;
return true;
} else {
free(field_data_index_array);
return false;
}
}
static bool pubsub_application_writer_group_handler(pubsub_handle writer_group)
{
UA_UNUSED(writer_group);
/* The writer group timing is handled in pubsub_module */
return false;
}
void pubsub_application_simulate_data(void)
{
int i = 0;
/* The first 10 values are published */
int count = 10;
/* Increment values */
for (i = 0; i < count; i++) {
g_values[i]++;
}
}