PID loop on a real PLC

Keep Nexus on the embedded address while binding Sigils to the PLC OPC UA endpoint and running a PID in a Weave.

You can keep Nexus listening on your usual embedded address while binding individual Sigils to the PLC’s OPC UA endpoint. The weave runs a small PID each cycle: read process value from the PLC, read setpoint from the twin (or another PLC-backed Sigil), compute output, and write the actuator back to the PLC.

Replace PLC_URL, node IDs, and credentials with values from your PLC vendor’s address space and security setup. If the PLC uses signing or encryption, supply the same security_policy, certificates, and message_security_mode you use elsewhere with asyncua (see Sigil in the SDK).

Example

from aether.nexus import Nexus
from aether.sigil import Sigil
from aether.weave import Weave

# Embedded OPC UA + HTTP/MCP (address your HMIs / tools use)
nexus = Nexus(opc_ua_url="opc.tcp://0.0.0.0:4840")

PLC_URL = "opc.tcp://192.168.1.100:4840"  # your PLC

# Setpoint on the embedded server so operators can tune via REST/MCP/OPC UA to Aether
setpoint = Sigil(
    node_id="ns=2;s=Loop.Setpoint",
    initial_value=50.0,
    description="Temperature setpoint (°C).",
)

# Live process variable from the PLC (read-only in Aether)
process_value = Sigil(
    node_id="ns=3;i=1001",  # replace with your PLC NodeId (string form)
    opc_ua_url=PLC_URL,
    user_name="opcuser",
    password="your_password",
    initial_value=0.0,
    writable=False,
    description="Measured temperature from PLC.",
)

# Actuator command to the PLC (e.g. heater %)
control_output = Sigil(
    node_id="ns=3;i=1002",
    opc_ua_url=PLC_URL,
    user_name="opcuser",
    password="your_password",
    initial_value=0.0,
    minimum_value=0.0,
    maximum_value=100.0,
    description="Heater demand to PLC (%).",
)

PID_DT = 0.05  # keep equal to the Weave cycle_time below
Kp, Ki, Kd = 2.0, 0.15, 0.02


class _PidState:
    integral = 0.0
    prev_err = 0.0


_pid = _PidState()


async def pid_step():
    sp = float(await setpoint.read())
    pv = float(await process_value.read())
    err = sp - pv

    _pid.integral += err * PID_DT
    deriv = (err - _pid.prev_err) / PID_DT if PID_DT > 0 else 0.0
    _pid.prev_err = err

    out = Kp * err + Ki * _pid.integral + Kd * deriv
    out = max(0.0, min(100.0, out))
    await control_output.write(out)


Weave(
    label="temperature_pid",
    cycle_time=PID_DT,
    callback=pid_step,
    description="Closed-loop PID: PV and output on PLC; setpoint on Aether.",
)

if __name__ == "__main__":
    nexus.start()

Tuning

Choose cycle_time to match what your PLC and process allow (often tens to hundreds of ms). Tune Kp, Ki, and Kd for your plant. For production, consider anti-windup, output rate limits, and setpoint ramping as needed.