.NET Based OPC UA Client/Server SDK  3.1.0.500
Slow Data Source Example

The Demo Server address space contains Variables that return their Read and Write results very slowly. They can be found in in the folder Demo → 017_SpecialVariables. The source code can be found in the file DemoNodeManager.SlowDataSource.cs

Background

In real-world applications, nodes in UA servers may represent devices that are answering very slowly, e.g. in error case. If access to these data sources is implemented incorrectly, the server blocks as long as the call to the API of the data source is outstanding. The implementation in the server is different if the API for accessing the device is synchronous or asynchronous.

Data Sources

In our example, there are classes that simulate slowly responding devices. The devices have methods for reading and writing. For simplicity, the device is represented by a single UINT32 value. There is also a simulation in the device classes: The value is incremented every 500 ms (see sample code below).

#region Base class
internal class SlowDataSource
{
public SlowDataSource()
{
m_timer = new Timer(DoUpdate, null, 500, 500);
}
void DoUpdate(object state)
{
lock (m_lock)
{
m_value++;
}
}
protected int m_delay = 2000;
protected uint m_value = 0;
private Timer m_timer;
protected object m_lock = new object();
}
#endregion
#region Synchronous API
internal class SlowDataSourceSynchronousAPI : SlowDataSource
{
public uint ReadValue()
{
System.Threading.Thread.Sleep(m_delay);
uint ret;
lock (m_lock)
{
ret = m_value;
}
return ret;
}
public bool WriteValue(uint value)
{
System.Threading.Thread.Sleep(m_delay);
lock (m_lock)
{
m_value = value;
}
return true;
}
}
#endregion
#region Asynchronous API
internal class SlowDataSourceAsynchronousAPI : SlowDataSource
{
public delegate void ReadComplete(uint value, object userData);
public delegate void WriteComplete(bool status, object userData);
public void BeginRead(ReadComplete callback, object userData)
{
ThreadPool.QueueUserWorkItem(Read, new ReadData()
{
Callback = callback,
UserData = userData
});
}
class ReadData
{
public ReadComplete Callback { get; set; }
public object UserData { get; set; }
}
private void Read(object state)
{
System.Threading.Thread.Sleep(m_delay);
ReadData data = state as ReadData;
ReadComplete callback = state as ReadComplete;
lock (m_lock)
{
data.Callback(m_value, data.UserData);
}
}
public void BeginWrite(uint value, WriteComplete callback, object userData)
{
ThreadPool.QueueUserWorkItem(Write, new WriteData()
{
Callback = callback,
Value = value,
UserData = userData
});
}
private void Write(object state)
{
System.Threading.Thread.Sleep(m_delay);
WriteData data = state as WriteData;
WriteComplete callback = state as WriteComplete;
lock (m_lock)
{
m_value = data.Value;
}
data.Callback(true, data.UserData);
}
private class WriteData
{
public WriteComplete Callback { get; set; }
public uint Value { get; set; }
public object UserData { get; set; }
}
}
#endregion

Wrapper Classes

We create wrapper classes for the devices and use these wrappers to configure the variables (see sample code below). The main usage of these wrappers will be explained later when the code for monitoring is added.

internal class SlowDataSourceWrapperSynchronous : SlowDataSourceWrapper
{
public SlowDataSourceWrapperSynchronous(SlowDataSourceSynchronousAPI dataSource)
{
m_dataSource = dataSource;
}
public SlowDataSourceSynchronousAPI DataSource
{
get
{
return m_dataSource;
}
}

Read

Reading the device values takes some time. So we must not read these values in the method

RequestContext context,
NodeAttributeHandle nodeHandle,
string indexRange,
QualifiedName dataEncoding)

since the server would be blocked until this call has returned. So we need to call the Read method of the device in the method

protected override void Read(
RequestContext context,
TransactionHandle transaction,
IList<NodeAttributeOperationHandle> operationHandles,
IList<ReadValueId> settings)

and invoke the callback of the transaction when the read of the device has finished.

Synchronous

For the synchronous API, we can use the ApplicationThreadPool to queue the Read calls. This example is designed to be adapted to other data sources. You only need to implement the ReadInJob delegate–the method OnReadSlowVariable in this case–and copy and paste the remaining code to your application.

if (operationHandles[ii].NodeHandle.UserData is SlowDataSourceWrapperSynchronous)
{
// We have to read the value in a thread
DoReadInJob(
new ReadInJobData()
{
Context = context,
Transaction = transaction,
OperationHandle = operationHandles[ii],
ReadValueId = settings[ii],
Callback = OnReadSlowVariable
});
}
private void DoReadInJob(
ReadInJobData data
)
{
Server.ThreadPool.Queue(data, OnReadInJob);
}
private void OnReadInJob(object state, StatusCode error)
{
if (error.IsBad())
{
return;
}
ReadInJobData data = state as ReadInJobData;
DataValue dv = data.Callback(data.ReadValueId, data.OperationHandle.NodeHandle);
((ReadCompleteEventHandler)data.Transaction.Callback)(
data.OperationHandle,
data.Transaction.CallbackData,
dv,
false);
}
delegate DataValue ReadInJob(ReadValueId nodeToRead, NodeAttributeHandle nodeHandle);
DataValue OnReadSlowVariable(ReadValueId nodeToRead, NodeAttributeHandle nodeHandle)
{
SlowDataSourceWrapperSynchronous dataSourceWrapper = nodeHandle.UserData as SlowDataSourceWrapperSynchronous;
uint value = dataSourceWrapper.DataSource.ReadValue();
return new DataValue()
{
WrappedValue = new Variant(value),
SourceTimestamp = DateTime.UtcNow,
ServerTimestamp = DateTime.UtcNow
};
}
class ReadInJobData
{
public RequestContext Context { get; set; }
public TransactionHandle Transaction { get; set; }
public NodeAttributeOperationHandle OperationHandle { get; set; }
public ReadValueId ReadValueId { get; set; }
public ReadInJob Callback { get; set; }
}

Asynchronous

When using the asynchronous API, we can invoke the transaction callback in the callback of the data source:

var dataSourceWrapper = operationHandles[ii].NodeHandle.UserData as SlowDataSourceWrapperAsynchronous;
if (dataSourceWrapper != null)
{
dataSourceWrapper.DataSource.BeginRead(OnSlowAsynReadComplete, new ReadAsyncData()
{
Transaction = transaction,
OperationHandle = operationHandles[ii]
});
}
}
private void OnSlowAsynReadComplete(uint value, object userData)
{
ReadAsyncData data = userData as ReadAsyncData;
DataValue dv = new DataValue()
{
WrappedValue = new Variant(value),
SourceTimestamp = DateTime.UtcNow,
ServerTimestamp = DateTime.UtcNow
};
((ReadCompleteEventHandler)data.Transaction.Callback)(
data.OperationHandle,
data.Transaction.CallbackData,
dv,
false);
}
class ReadAsyncData
{
public TransactionHandle Transaction { get; set; }
public NodeAttributeOperationHandle OperationHandle { get; set; }
}

Write

The Write implementation is equivalent to the Read implementation and not therefore not shown in this documentation.

Monitoring

Even if the external data source is not event based and we can get current values only by calling a Read method, we must not set the NodeHandleType to ExternalPolled. Using this NodeHandleType could waste the threads of the ThreadPool (of the OPC UA server in the synchronous case or of the device in the asynchronous case). As a result, the server could block.

So we need to use the NodeHandleType ExternalPushed and do an own sampling of the data source. This sampling is done by the wrapper classes of the data source. If the first MonitoredItem is added to the wrapper, the sampling is started. If the last MonitoredItem is removed, the sampling is stopped again. Since we are using the NodeHandleType ExternalPushed we need to implement

  • StartDataMonitoring
  • ModifyDataMonitoring
  • StopDataMonitoring
  • SetDataMonitoringMode

In these methods, the MonitoredItems are added to and removed from the wrapper classes:

private DataMonitoringResult StartDataMonitoringSlowVariables(
RequestContext context,
MonitoredItemHandle itemHandle,
MonitoredItemCreateRequest settings,
DataChangeEventHandler callback)
{
DataMonitoringResult result = Server.ValidateDataMonitoringRequest(
context,
itemHandle.NodeHandle,
settings.ItemToMonitor,
settings.RequestedParameters,
null);
if (result.StatusCode.IsBad())
{
return result;
}
SlowDataSourceWrapper dataSourceWrapper = itemHandle.NodeHandle.UserData as SlowDataSourceWrapper;
dataSourceWrapper.AddMonitoredItem(new SlowDataSourceWrapperMonitoredItem()
{
ItemHandle = itemHandle,
MonitoringMode = settings.MonitoringMode,
SampligInterval = (int)settings.RequestedParameters.SamplingInterval,
Callback = callback
});
return result;
}
private DataMonitoringResult ModifyDataMonitoringSlowVariables(
RequestContext context,
MonitoredItemHandle itemHandle,
MonitoredItemModifyRequest settings)
{
// validate request.
DataMonitoringResult result = Server.ValidateDataMonitoringRequest(
context,
itemHandle.NodeHandle,
null,
settings.RequestedParameters,
null);
if (result.StatusCode.IsBad())
{
return result;
}
return result;
}
private StatusCode? StopDataMonitoringSlowVariables(
RequestContext context,
MonitoredItemHandle itemHandle)
{
SlowDataSourceWrapper dataSourceWrapper = itemHandle.NodeHandle.UserData as SlowDataSourceWrapper;
dataSourceWrapper.RemoveMonitoredItem(itemHandle);
return StatusCodes.Good;
}
private StatusCode? SetDataMonitoringModeSlowVariables(
RequestContext context,
MonitoredItemHandle itemHandle,
MonitoringMode monitoringMode,
MonitoringParameters parameters)
{
SlowDataSourceWrapper dataSourceWrapper = itemHandle.NodeHandle.UserData as SlowDataSourceWrapper;
bool succeeded = dataSourceWrapper.SetMonitoringMode(itemHandle, monitoringMode, parameters);
return StatusCodes.Good;
}
internal abstract class SlowDataSourceWrapper
{
public SlowDataSourceWrapper()
{
m_lock = new object();
}
public void AddMonitoredItem(SlowDataSourceWrapperMonitoredItem monitoredItem)
{
lock (m_lock)
{
if (m_monitoredItems == null)
{
m_monitoredItems = new List<SlowDataSourceWrapperMonitoredItem>();
}
m_monitoredItems.Add(monitoredItem);
int lastSamplingInterval = m_currentSamplingInterval;
UpdatePolling();
if (m_lastValue != null && m_currentSamplingInterval == lastSamplingInterval)
{
monitoredItem.Callback(null, monitoredItem.ItemHandle, m_lastValue, false);
}
else
{
monitoredItem.Callback(null, monitoredItem.ItemHandle, new DataValue(StatusCodes.BadWaitingForInitialData), false);
}
}
}
public bool RemoveMonitoredItem(MonitoredItemHandle itemHandle)
{
bool itemRemoved = false;
lock (m_lock)
{
if (m_monitoredItems == null)
{
return false;
}
foreach (SlowDataSourceWrapperMonitoredItem monitoredItem in m_monitoredItems)
{
if (monitoredItem.ItemHandle == itemHandle)
{
m_monitoredItems.Remove(monitoredItem);
itemRemoved = true;
break;
}
}
if (m_monitoredItems.Count == 0)
{
m_timer.Dispose();
m_timer = null;
m_instance = null;
m_lastValue = null;
m_currentSamplingInterval = 0;
}
else
{
UpdatePolling();
}
}
return itemRemoved;
}
public bool SetMonitoringMode(
MonitoredItemHandle itemHandle,
MonitoringMode monitoringMode,
MonitoringParameters parameters)
{
lock (m_lock)
{
foreach (var monitoredItem in m_monitoredItems)
{
if (monitoredItem.ItemHandle == itemHandle)
{
monitoredItem.SampligInterval = (int)parameters.SamplingInterval;
monitoredItem.MonitoringMode = monitoringMode;
UpdatePolling();
monitoredItem.Callback(null, monitoredItem.ItemHandle, m_lastValue, false);
return true;
}
}
}
return false;
}
protected void UpdatePolling()
{
lock (m_lock)
{
int samplingInterval = NewSamplingInterval();
if (samplingInterval < m_currentSamplingInterval || m_currentSamplingInterval == 0)
{
if (m_timer != null)
{
m_timer.Dispose();
m_lastValue = null;
}
m_instance = new object();
m_timer = new Timer(OnPoll, m_instance, 0, samplingInterval);
m_currentSamplingInterval = samplingInterval;
}
}
}
protected int NewSamplingInterval()
{
int newSamplingInterval = 5000;
foreach (SlowDataSourceWrapperMonitoredItem monitoredItem in m_monitoredItems)
{
newSamplingInterval = Math.Min(newSamplingInterval, (int)monitoredItem.SampligInterval);
}
if (newSamplingInterval <= 250)
{
return 250;
}
if (newSamplingInterval <= 1000)
{
return 1000;
}
return 5000;
}
protected abstract void OnPoll(object state);
protected object m_lock;
protected object m_instance;
protected Timer m_timer;
protected List<SlowDataSourceWrapperMonitoredItem> m_monitoredItems;
protected int m_currentSamplingInterval;
protected DataValue m_lastValue;
}
internal class SlowDataSourceWrapperSynchronous : SlowDataSourceWrapper
{
public SlowDataSourceWrapperSynchronous(SlowDataSourceSynchronousAPI dataSource)
{
m_dataSource = dataSource;
}
public SlowDataSourceSynchronousAPI DataSource
{
get
{
return m_dataSource;
}
}
protected override void OnPoll(object state)
{
uint value = m_dataSource.ReadValue();
lock (m_lock)
{
if (m_instance == state)
{
m_lastValue = new DataValue()
{
WrappedValue = new Variant(value),
SourceTimestamp = DateTime.UtcNow,
ServerTimestamp = DateTime.UtcNow
};
foreach (SlowDataSourceWrapperMonitoredItem monitoredItem in m_monitoredItems)
{
monitoredItem.Callback(null, monitoredItem.ItemHandle, m_lastValue, false);
}
}
}
}
private SlowDataSourceSynchronousAPI m_dataSource;
}
internal class SlowDataSourceWrapperAsynchronous : SlowDataSourceWrapper
{
public SlowDataSourceWrapperAsynchronous(SlowDataSourceAsynchronousAPI dataSource)
{
m_dataSource = dataSource;
}
public SlowDataSourceAsynchronousAPI DataSource
{
get
{
return m_dataSource;
}
}
protected override void OnPoll(object state)
{
m_dataSource.BeginRead(OnReadComplete, state);
}
void OnReadComplete(uint value, object userData)
{
lock (m_lock)
{
if (m_instance == userData)
{
m_lastValue = new DataValue()
{
WrappedValue = new Variant(value),
SourceTimestamp = DateTime.UtcNow,
ServerTimestamp = DateTime.UtcNow
};
foreach (SlowDataSourceWrapperMonitoredItem monitoredItem in m_monitoredItems)
{
monitoredItem.Callback(null, monitoredItem.ItemHandle, m_lastValue, false);
}
}
}
}
private SlowDataSourceAsynchronousAPI m_dataSource;
}
internal class SlowDataSourceWrapperMonitoredItem
{
public MonitoredItemHandle ItemHandle { get; set; }
public MonitoringMode MonitoringMode { get; set; }
public int SampligInterval { get; set; }
public DataChangeEventHandler Callback { get; set; }
}