Sphero Public SDK

Sphero Public SDK

  • Getting Started
  • How To
  • Documentation
  • Get the Code
  • Samples
  • Troubleshooting
  • FAQs

›How to Use the Raspberry Pi SDKs

How to Use the Raspberry Pi SDKs

  • Python

How To Use the Raspberry Pi Python SDK

Asynchronous

API and Shell (Pinging)

import os
import sys
sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), '../../../')))

import asyncio
from sphero_sdk import SpheroRvrAsync
from sphero_sdk import SerialAsyncDal
from sphero_sdk import SpheroRvrTargets


loop = asyncio.get_event_loop()

rvr = SpheroRvrAsync(
    dal=SerialAsyncDal(
        loop
    )
)


async def main():
    """ This program demonstrates how to use the echo command, which sends data to RVR and RVR returns
        the same data. Echo can be used to check to see if RVR is connected and awake.
    """

    await rvr.wake()

    # Give RVR time to wake up
    await asyncio.sleep(2)

    echo_response = await rvr.echo(
        data=[0, 1, 2],
        target=SpheroRvrTargets.primary.value
    )
    print('Echo response 1: ', echo_response)

    echo_response = await rvr.echo(
        data=[4, 5, 6],
        target=SpheroRvrTargets.secondary.value
    )
    print('Echo response 2: ', echo_response)

    await rvr.close()


if __name__ == '__main__':
    try:
        loop.run_until_complete(
            main()
        )

    except KeyboardInterrupt:
        print('\nProgram terminated with keyboard interrupt.')

        loop.run_until_complete(
            rvr.close()
        )

    finally:
        if loop.is_running():
            loop.close()

API and Shell (Pinging) with REST API

import os
import sys
sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), '../../../')))

import asyncio
from sphero_sdk import SpheroRvrAsync
from sphero_sdk import RestfulAsyncDal
from sphero_sdk import SpheroRvrTargets


loop = asyncio.get_event_loop()

rvr = SpheroRvrAsync(
    dal=RestfulAsyncDal(
        domain='10.211.2.21',  # Add your raspberry-pi's IP address here
        port=2010  # The port opened by the npm server is always 2010
    )
)


async def main():
    """ This program demonstrates how to use the echo command, which sends data to RVR and has RVR returns
        the same data. Echo can be used to check to see if RVR is connected and awake.  In order to test it,
        a node.js server must be running on the raspberry-pi connected to RVR.  This code is meant to be
        executed from a separate computer.
    """

    await rvr.wake()

    # Give RVR time to wake up
    await asyncio.sleep(2)

    echo_response = await rvr.echo(
        data=[0, 1, 2],
        target=SpheroRvrTargets.primary.value
    )
    print('Echo response 1: ', echo_response)

    echo_response = await rvr.echo(
        data=[4, 5, 6],
        target=SpheroRvrTargets.secondary.value
    )
    print('Echo response 2: ', echo_response)

    await rvr.close()


if __name__ == '__main__':
    try:
        loop.run_until_complete(
            main()
        )

    except KeyboardInterrupt:
        print('\nProgram terminated with keyboard interrupt.')

        loop.run_until_complete(
            rvr.close()
        )

    finally:
        if loop.is_running():
            loop.close()

Color Sensor (Color Detection)

import os
import sys
sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), '../../../')))

import asyncio
from sphero_sdk import SpheroRvrAsync
from sphero_sdk import SerialAsyncDal
from sphero_sdk import RvrStreamingServices


loop = asyncio.get_event_loop()

rvr = SpheroRvrAsync(
    dal=SerialAsyncDal(
        loop
    )
)


async def color_detected_handler(color_detected_data):
    print('Color detection data response: ', color_detected_data)


async def main():
    """ This program demonstrates how to use the color sensor on RVR (located on the down side of RVR, facing the floor)
        to report colors detected. To exit program, press <CTRL-C>
    """

    await rvr.wake()

    # Give RVR time to wake up
    await asyncio.sleep(2)

    await rvr.enable_color_detection(is_enabled=True)
    await rvr.sensor_control.add_sensor_data_handler(
        service=RvrStreamingServices.color_detection,
        handler=color_detected_handler
    )
    await rvr.sensor_control.start(interval=250)

    while True:
        await asyncio.sleep(1)


if __name__ == '__main__':
    try:
        asyncio.ensure_future(
            main()
        )
        loop.run_forever()

    except KeyboardInterrupt:
        print('\nProgram terminated with keyboard interrupt.')

        loop.run_until_complete(
            asyncio.gather(
                rvr.enable_color_detection(is_enabled=False),
                rvr.close()
            )
        )

    finally:
        if loop.is_running():
            loop.close()

Color Sensor (Color Detection) with REST API

import os
import sys
sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), '../../../')))

import asyncio
from sphero_sdk import SpheroRvrAsync
from sphero_sdk import RestfulAsyncDal
from sphero_sdk import RvrStreamingServices


loop = asyncio.get_event_loop()

rvr = SpheroRvrAsync(
    dal=RestfulAsyncDal(
        domain='10.211.2.21',  # Add your raspberry-pi's IP address here
        port=2010  # The port opened by the npm server is always 2010
    )
)


async def color_detected_handler(color_detected_data):
    print('Color detection data response: ', color_detected_data)


async def main():
    """ This program uses the color sensor on RVR (located on the down side of RVR, facing the floor) to report colors detected.
        To exit program, press <CTRL-C>
    """

    await rvr.wake()

    # Give RVR time to wake up
    await asyncio.sleep(2)

    await rvr.enable_color_detection(is_enabled=True)
    await rvr.sensor_control.add_sensor_data_handler(
        service=RvrStreamingServices.color_detection,
        handler=color_detected_handler
    )
    await rvr.sensor_control.start(interval=250)

    while True:
        await asyncio.sleep(1)


if __name__ == '__main__':
    try:
        asyncio.ensure_future(
            main()
        )
        loop.run_forever()

    except KeyboardInterrupt:
        print('\nProgram terminated with keyboard interrupt.')

        loop.run_until_complete(
            asyncio.gather(
                rvr.enable_color_detection(is_enabled=False),
                rvr.close()
            )
        )

    finally:
        if loop.is_running():
            loop.close()

Drive - Raw Motors

import os
import sys
sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), '../../../')))

import asyncio
from sphero_sdk import SpheroRvrAsync
from sphero_sdk import SerialAsyncDal
from sphero_sdk import RawMotorModesEnum


loop = asyncio.get_event_loop()

rvr = SpheroRvrAsync(
    dal=SerialAsyncDal(
        loop
    )
)


async def main():
    """ This program has RVR drive around in different directions.
    """

    await rvr.wake()

    # Give RVR time to wake up
    await asyncio.sleep(2)

    await rvr.reset_yaw()

    await rvr.raw_motors(
        left_mode=RawMotorModesEnum.forward.value,
        left_speed=128,  # Valid speed values are 0-255
        right_mode=RawMotorModesEnum.forward.value,
        right_speed=128  # Valid speed values are 0-255
    )

    # Delay to allow RVR to drive
    await asyncio.sleep(1)

    await rvr.raw_motors(
        left_mode=RawMotorModesEnum.reverse.value,
        left_speed=64,  # Valid speed values are 0-255
        right_mode=RawMotorModesEnum.reverse.value,
        right_speed=64  # Valid speed values are 0-255
    )

    # Delay to allow RVR to drive
    await asyncio.sleep(1)

    await rvr.raw_motors(
        left_mode=RawMotorModesEnum.reverse.value,
        left_speed=128,  # Valid speed values are 0-255
        right_mode=RawMotorModesEnum.forward.value,
        right_speed=128  # Valid speed values are 0-255
    )

    # Delay to allow RVR to drive
    await asyncio.sleep(1)

    await rvr.raw_motors(
        left_mode=RawMotorModesEnum.forward.value,
        left_speed=128,  # Valid speed values are 0-255
        right_mode=RawMotorModesEnum.forward.value,
        right_speed=128  # Valid speed values are 0-255
    )

    # Delay to allow RVR to drive
    await asyncio.sleep(1)

    await rvr.raw_motors(
        left_mode=RawMotorModesEnum.off.value,
        left_speed=0,  # Valid speed values are 0-255
        right_mode=RawMotorModesEnum.off.value,
        right_speed=0  # Valid speed values are 0-255
    )

    # Delay to allow RVR to drive
    await asyncio.sleep(1)

    await rvr.close()


if __name__ == '__main__':
    try:
        loop.run_until_complete(
            main()
        )

    except KeyboardInterrupt:
        print('\nProgram terminated with keyboard interrupt.')

        loop.run_until_complete(
            rvr.close()
        )

    finally:
        if loop.is_running():
            loop.close()

Drive with Heading

import os
import sys
sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), '../../../')))

import asyncio
from sphero_sdk import SpheroRvrAsync
from sphero_sdk import SerialAsyncDal
from sphero_sdk import DriveFlagsBitmask


loop = asyncio.get_event_loop()

rvr = SpheroRvrAsync(
    dal=SerialAsyncDal(
        loop
    )
)


async def main():
    """ This program has RVR drive around in different directions using the function drive_with_heading.
    """

    await rvr.wake()

    # Give RVR time to wake up
    await asyncio.sleep(2)

    await rvr.reset_yaw()

    await rvr.drive_with_heading(
        speed=128,  # Valid speed values are 0-255
        heading=0,  # Valid heading values are 0-359
        flags=DriveFlagsBitmask.none.value
    )

    # Delay to allow RVR to drive
    await asyncio.sleep(1)

    await rvr.drive_with_heading(
        speed=128,  # Valid speed values are 0-255
        heading=0,  # Valid heading values are 0-359
        flags=DriveFlagsBitmask.drive_reverse.value
    )

    # Delay to allow RVR to drive
    await asyncio.sleep(1)

    await rvr.drive_with_heading(
        speed=128,  # Valid speed values are 0-255
        heading=90,  # Valid heading values are 0-359
        flags=DriveFlagsBitmask.none.value
    )

    # Delay to allow RVR to drive
    await asyncio.sleep(1)

    await rvr.drive_with_heading(
        speed=128,  # Valid speed values are 0-255
        heading=270,  # Valid heading values are 0-359
        flags=DriveFlagsBitmask.none.value
    )

    # Delay to allow RVR to drive
    await asyncio.sleep(1)

    await rvr.drive_with_heading(
        speed=0,  # Valid heading values are 0-359
        heading=0,  # Valid heading values are 0-359
        flags=DriveFlagsBitmask.none.value
    )

    # Delay to allow RVR to drive
    await asyncio.sleep(1)

    await rvr.close()


if __name__ == '__main__':
    try:
        loop.run_until_complete(
            main()
        )

    except KeyboardInterrupt:
        print('\nProgram terminated with keyboard interrupt.')

        loop.run_until_complete(
            rvr.close()
        )

    finally:
        if loop.is_running():
            loop.close()

Drive with Heading - Reverse Mode

import os
import sys
sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), '../../../')))

import asyncio
from sphero_sdk import SpheroRvrAsync
from sphero_sdk import SerialAsyncDal
from sphero_sdk import DriveFlagsBitmask


loop = asyncio.get_event_loop()

rvr = SpheroRvrAsync(
    dal=SerialAsyncDal(
        loop
    )
)


async def main():
    """ This program has RVR drive around in different directions using the function drive_with_heading.
    """

    await rvr.wake()

    # Give RVR time to wake up
    await asyncio.sleep(2)

    await rvr.reset_yaw()

    await rvr.drive_with_heading(
        speed=128,  # Valid speed values are 0-255
        heading=90,  # Valid heading values are 0-359
        flags=DriveFlagsBitmask.drive_reverse.value
    )

    # Delay to allow RVR to drive
    await asyncio.sleep(1)

    await rvr.drive_with_heading(
        speed=0,  # Valid heading values are 0-359
        heading=0,  # Valid heading values are 0-359
        flags=DriveFlagsBitmask.none.value
    )

    # Delay to allow RVR to drive
    await asyncio.sleep(1)

    await rvr.close()


if __name__ == '__main__':
    try:
        loop.run_until_complete(
            main()
        )

    except KeyboardInterrupt:
        print('\nProgram terminated with keyboard interrupt.')

        loop.run_until_complete(
            rvr.close()
        )

    finally:
        if loop.is_running():
            loop.close()

Drive with Helper

import os
import sys
sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), '../../../')))

import asyncio
from sphero_sdk import SpheroRvrAsync
from sphero_sdk import SerialAsyncDal


loop = asyncio.get_event_loop()

rvr = SpheroRvrAsync(
    dal=SerialAsyncDal(
        loop
    )
)


async def main():
    """ This program has RVR drive with how to drive RVR using the drive control helper.
    """

    await rvr.wake()

    # Give RVR time to wake up
    await asyncio.sleep(2)

    await rvr.drive_control.reset_heading()

    await rvr.drive_control.drive_forward_seconds(
        speed=64,
        heading=0,  # Valid heading values are 0-359
        time_to_drive=1
    )

    # Delay to allow RVR to drive
    await asyncio.sleep(1)

    await rvr.drive_control.drive_backward_seconds(
        speed=64,
        heading=0,  # Valid heading values are 0-359
        time_to_drive=1
    )

    # Delay to allow RVR to drive
    await asyncio.sleep(1)

    await rvr.drive_control.turn_left_degrees(
        heading=0,  # Valid heading values are 0-359
        amount=90
    )

    # Delay to allow RVR to drive
    await asyncio.sleep(1)

    await rvr.close()


if __name__ == '__main__':
    try:
        loop.run_until_complete(
            main()
        )

    except KeyboardInterrupt:
        print('\nProgram terminated with keyboard interrupt.')

        loop.run_until_complete(
            rvr.close()
        )

    finally:
        if loop.is_running():
            loop.close()

Drive with Helper - Roll

import os
import sys
sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), '../../../')))

import asyncio
from sphero_sdk import SpheroRvrAsync
from sphero_sdk import SerialAsyncDal


loop = asyncio.get_event_loop()

rvr = SpheroRvrAsync(
    dal=SerialAsyncDal(
        loop
    )
)


async def main():
    """ This program has RVR drive with how to drive RVR using the drive control helper.
    """

    await rvr.wake()

    # Give RVR time to wake up
    await asyncio.sleep(2)

    await rvr.drive_control.reset_heading()

    await rvr.drive_control.roll_start(
        speed=64,
        heading=90
    )

    # Delay to allow RVR to drive
    await asyncio.sleep(1)

    await rvr.drive_control.roll_stop(heading=270)

    # Delay to allow RVR to drive
    await asyncio.sleep(1)

    await rvr.close()


if __name__ == '__main__':
    try:
        loop.run_until_complete(
            main()
        )

    except KeyboardInterrupt:
        print('\nProgram terminated with keyboard interrupt.')

        loop.run_until_complete(
            rvr.close()
        )

    finally:
        if loop.is_running():
            loop.close()

Drive with REST API

import os
import sys
sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), '../../../')))

import asyncio
from sphero_sdk import SpheroRvrAsync
from sphero_sdk import RestfulAsyncDal
from sphero_sdk import DriveFlagsBitmask


loop = asyncio.get_event_loop()

rvr = SpheroRvrAsync(
    dal=RestfulAsyncDal(
        domain='10.211.2.21',  # Add your raspberry-pi's IP address here
        port=2010  # The port opened by the npm server is always 2010
    )
)


async def main():
    """ This program has RVR drive using the Rest API.  In order to test it, a node.js server must be running on the
        raspberry-pi connected to RVR.  This code is meant to be executed from a separate computer.

        Note:
            To give RVR time to drive, we call asyncio.sleep(...); if we did not have these calls, the program would
            go on and execute all the statements and exit without the driving ever taking place.
    """

    await rvr.wake()

    # Give RVR time to wake up
    await asyncio.sleep(2)

    await rvr.reset_yaw()

    await rvr.drive_with_heading(
        speed=128,  # Valid speed values are 0-255
        heading=0,  # Valid heading values are 0-359
        flags=DriveFlagsBitmask.none.value
    )

    # Delay to allow RVR to drive
    await asyncio.sleep(1)

    await rvr.drive_with_heading(
        speed=128,  # Valid speed values are 0-255
        heading=0,  # Valid heading values are 0-359
        flags=DriveFlagsBitmask.drive_reverse.value
    )

    # Delay to allow RVR to drive
    await asyncio.sleep(1)

    await rvr.drive_with_heading(
        speed=128,  # Valid speed values are 0-255
        heading=90,  # Valid heading values are 0-359
        flags=DriveFlagsBitmask.none.value
    )

    # Delay to allow RVR to drive
    await asyncio.sleep(1)

    await rvr.drive_with_heading(
        speed=128,  # Valid speed values are 0-255
        heading=270,  # Valid heading values are 0-359
        flags=DriveFlagsBitmask.none.value
    )

    # Delay to allow RVR to drive
    await asyncio.sleep(1)

    await rvr.drive_with_heading(
        speed=0,  # Valid heading values are 0-359
        heading=0,  # Valid heading values are 0-359
        flags=DriveFlagsBitmask.none.value
    )

    # Delay to allow RVR to drive
    await asyncio.sleep(1)

    await rvr.close()


if __name__ == '__main__':
    try:
        loop.run_until_complete(
            main()
        )

    except KeyboardInterrupt:
        print('\nProgram terminated with keyboard interrupt.')

        loop.run_until_complete(
            rvr.close()
        )

    finally:
        if loop.is_running():
            loop.close()

Infrared - Broadcast IR

import os
import sys
sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), '../../../')))

import asyncio
from sphero_sdk import SpheroRvrAsync
from sphero_sdk import SerialAsyncDal
from sphero_sdk import InfraredCodes
from sphero_sdk import RawMotorModesEnum


loop = asyncio.get_event_loop()

rvr = SpheroRvrAsync(
    dal=SerialAsyncDal(
        loop
    )
)


async def main():
    """ This program sets up RVR to communicate with another robot, e.g. BOLT, capable of infrared communication.
    """

    await rvr.wake()

    # Give RVR time to wake up
    await asyncio.sleep(2)

    await rvr.start_robot_to_robot_infrared_broadcasting(
        far_code=InfraredCodes.one.value,
        near_code=InfraredCodes.zero.value
    )

    for i in range(2):
        await rvr.raw_motors(
            left_mode=RawMotorModesEnum.forward.value,
            left_speed=64,  # Valid speed values are 0-255
            right_mode=RawMotorModesEnum.forward.value,
            right_speed=64  # Valid speed values are 0-255
        )

        # Delay to allow RVR to drive
        await asyncio.sleep(2)

    await rvr.stop_robot_to_robot_infrared_broadcasting()

    await rvr.close()


if __name__ == '__main__':
    try:
        loop.run_until_complete(
            main()
        )

    except KeyboardInterrupt:
        print('\nProgram terminated with keyboard interrupt.')

        loop.run_until_complete(
            asyncio.gather(
                rvr.stop_robot_to_robot_infrared_broadcasting(),
                rvr.close()
            )
        )

    finally:
        if loop.is_running():
            loop.close()

Infrared - Bradcast IR with Helper

import os
import sys
sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), '../../../')))

import asyncio
from sphero_sdk import SpheroRvrAsync
from sphero_sdk import SerialAsyncDal
from sphero_sdk import InfraredCodes
from sphero_sdk import RawMotorModesEnum


loop = asyncio.get_event_loop()

rvr = SpheroRvrAsync(
    dal=SerialAsyncDal(
        loop
    )
)


async def main():
    """ This program sets up RVR to communicate with another robot, e.g. BOLT, capable of infrared communication.
    """

    await rvr.wake()

    # Give RVR time to wake up
    await asyncio.sleep(2)

    await rvr.infrared_control.start_infrared_broadcasting(
        far_code=InfraredCodes.one,
        near_code=InfraredCodes.zero
    )

    await rvr.raw_motors(
        left_mode=RawMotorModesEnum.forward.value,
        left_speed=64,  # Valid speed values are 0-255
        right_mode=RawMotorModesEnum.forward.value,
        right_speed=64  # Valid speed values are 0-255
    )

    # Delay to allow RVR to drive
    await asyncio.sleep(4)

    await rvr.infrared_control.stop_infrared_broadcasting()

    await rvr.close()


if __name__ == '__main__':
    try:
        loop.run_until_complete(
            main()
        )

    except KeyboardInterrupt:
        print('\nProgram terminated with keyboard interrupt.')

        loop.run_until_complete(
            asyncio.gather(
                rvr.stop_robot_to_robot_infrared_broadcasting(),
                rvr.close()
            )
        )

    finally:
        if loop.is_running():
            loop.close()

Infrared - Listen For and Send IR

import os
import sys
sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), '../../../')))

import asyncio
from sphero_sdk import SpheroRvrAsync
from sphero_sdk import SerialAsyncDal


loop = asyncio.get_event_loop()

rvr = SpheroRvrAsync(
    dal=SerialAsyncDal(
        loop
    )
)


async def infrared_message_received_handler(infrared_message):
    print('Infrared message response: ', infrared_message)


async def main():
    """ This program sets up RVR to communicate with another robot, e.g. BOLT, capable of infrared communication.
    """

    await rvr.wake()

    # Give RVR time to wake up
    await asyncio.sleep(2)

    await rvr.on_robot_to_robot_infrared_message_received_notify(handler=infrared_message_received_handler)

    await rvr.enable_robot_infrared_message_notify(is_enabled=True)

    infrared_code = 3
    strength = 64

    while True:
        await rvr.send_infrared_message(
            infrared_code=infrared_code,
            front_strength=strength,
            left_strength=strength,
            right_strength=strength,
            rear_strength=strength
        )

        print('Infrared message sent with code: {0}'.format(infrared_code))

        await asyncio.sleep(2)


if __name__ == '__main__':
    try:
        asyncio.ensure_future(
            main()
        )
        loop.run_forever()

    except KeyboardInterrupt:
        print('\nProgram terminated with keyboard interrupt.')

        loop.run_until_complete(
            asyncio.gather(
                rvr.stop_robot_to_robot_infrared_broadcasting(),
                rvr.close()
            )
        )

    finally:
        if loop.is_running():
            loop.close()

Infrared - Listen For and Send IR with Helper

import os
import sys
sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), '../../../')))

import asyncio
from sphero_sdk import SpheroRvrAsync
from sphero_sdk import SerialAsyncDal
from sphero_sdk import InfraredCodes


loop = asyncio.get_event_loop()

rvr = SpheroRvrAsync(
    dal=SerialAsyncDal(
        loop
    )
)


async def infrared_message_received_handler(infrared_message):
    print('Infrared message response: ', infrared_message)


async def main():
    """ This program sets up RVR to communicate with another robot, e.g. BOLT, capable of infrared communication.
    """

    await rvr.wake()

    # Give RVR time to wake up
    await asyncio.sleep(2)

    await rvr.infrared_control.listen_for_infrared_message(handler=infrared_message_received_handler)

    codes = [
        InfraredCodes.zero,
        InfraredCodes.one,
        InfraredCodes.two,
        InfraredCodes.three
    ]

    while True:
        await rvr.infrared_control.send_infrared_messages(
            messages=codes,
            strength=64
        )

        print('Infrared message sent with codes: {0}'.format([code.value for code in codes]))

        await asyncio.sleep(2)


if __name__ == '__main__':
    try:
        loop.run_until_complete(
            main()
        )

    except KeyboardInterrupt:
        print('\nProgram terminated with keyboard interrupt.')

        loop.run_until_complete(
            asyncio.gather(
                rvr.stop_robot_to_robot_infrared_broadcasting(),
                rvr.close()
            )
        )

    finally:
        if loop.is_running():
            loop.close()

Infrared - Listen For and Send IR with REST API

import os
import sys
sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), '../../../')))

import asyncio
from sphero_sdk import SpheroRvrAsync
from sphero_sdk import RestfulAsyncDal


loop = asyncio.get_event_loop()

rvr = SpheroRvrAsync(
    dal=RestfulAsyncDal(
        domain='10.211.2.21',  # Add your raspberry-pi's IP address here
        port=2010  # The port opened by the npm server is always 2010
    )
)


async def infrared_message_received_handler(response):
    print('Response data for IR message received:',response)


async def main():
    """ This program sets up RVR to communicate with another robot, e.g. BOLT, capable of infrared communication.

        To try this out, write a script for your other robot that a) broadcasts on the corresponding
        channel that RVR is set to listen to [in this case channel 0] and b) listens on the channel which
        RVR sends messages on [in this case channel 3]
    """

    await rvr.wake()

    # Give RVR time to wake up
    await asyncio.sleep(2)

    await rvr.on_robot_to_robot_infrared_message_received_notify(handler=infrared_message_received_handler)

    await rvr.enable_robot_infrared_message_notify(is_enabled=True)

    infrared_code = 3
    strength = 64

    while True:
        await rvr.send_infrared_message(
            infrared_code=infrared_code,
            front_strength=strength,
            left_strength=strength,
            right_strength=strength,
            rear_strength=strength
        )

        print('Infrared message sent with code: {0}'.format(infrared_code))

        await asyncio.sleep(2)


if __name__ == '__main__':
    try:
        asyncio.ensure_future(
            main()
        )
        loop.run_forever()

    except KeyboardInterrupt:
        print('\nProgram terminated with keyboard interrupt.')

        loop.run_until_complete(
            asyncio.gather(
                rvr.stop_robot_to_robot_infrared_broadcasting(),
                rvr.close()
            )
        )

    finally:
        if loop.is_running():
            loop.close()

LEDs - Set All LEDs

import os
import sys
sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), '../../../')))

import asyncio
from sphero_sdk import SpheroRvrAsync
from sphero_sdk import Colors
from sphero_sdk import RvrLedGroups
from sphero_sdk import SerialAsyncDal


loop = asyncio.get_event_loop()

rvr = SpheroRvrAsync(
    dal=SerialAsyncDal(
        loop
    )
)


async def main():
    """ This program demonstrates how to set the all the LEDs.
    """

    await rvr.wake()

    # Give RVR time to wake up
    await asyncio.sleep(2)

    await rvr.set_all_leds(
        led_group=RvrLedGroups.all_lights.value,
        led_brightness_values=[color for _ in range(10) for color in Colors.off.value]
    )

    # Delay to show LEDs change
    await asyncio.sleep(1)

    await rvr.set_all_leds(
        led_group=RvrLedGroups.all_lights.value,
        led_brightness_values=[color for x in range(10) for color in [0, 255, 0]]
    )

    # Delay to show LEDs change
    await asyncio.sleep(1)

    await rvr.close()


if __name__ == '__main__':
    try:
        loop.run_until_complete(
            main()
        )

    except KeyboardInterrupt:
        print('\nProgram terminated with keyboard interrupt.')

        loop.run_until_complete(
            rvr.close()
        )

    finally:
        if loop.is_running():
            loop.close()

LEDs - Set All LEDs with Helper

import os
import sys
sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), '../../../')))

import asyncio
from sphero_sdk import SpheroRvrAsync
from sphero_sdk import Colors
from sphero_sdk import SerialAsyncDal


loop = asyncio.get_event_loop()

rvr = SpheroRvrAsync(
    dal=SerialAsyncDal(
        loop
    )
)


async def main():
    """ This program demonstrates how to set the all the LEDs of RVR using the LED control helper.
    """

    await rvr.wake()

    # Give RVR time to wake up
    await asyncio.sleep(2)

    await rvr.led_control.turn_leds_off()

    # Delay to show LEDs change
    await asyncio.sleep(1)

    await rvr.led_control.set_all_leds_color(color=Colors.yellow)

    # Delay to show LEDs change
    await asyncio.sleep(1)

    await rvr.led_control.turn_leds_off()

    # Delay to show LEDs change
    await asyncio.sleep(1)

    await rvr.led_control.set_all_leds_rgb(red=255, green=144, blue=0)

    # Delay to show LEDs change
    await asyncio.sleep(1)

    await rvr.close()


if __name__ == '__main__':
    try:
        loop.run_until_complete(
            main()
        )

    except KeyboardInterrupt:
        print('\nProgram terminated with keyboard interrupt.')

        loop.run_until_complete(
            rvr.close()
        )

    finally:
        if loop.is_running():
            loop.close()

LEDs - Set Multiple LEDs

import os
import sys
sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), '../../../')))

import asyncio
from sphero_sdk import SpheroRvrAsync
from sphero_sdk import Colors
from sphero_sdk import RvrLedGroups
from sphero_sdk import SerialAsyncDal


loop = asyncio.get_event_loop()

rvr = SpheroRvrAsync(
    dal=SerialAsyncDal(
        loop
    )
)


async def main():
    """ This program demonstrates how to set multiple LEDs.
    """

    await rvr.wake()

    # Give RVR time to wake up
    await asyncio.sleep(2)

    await rvr.set_all_leds(
        led_group=RvrLedGroups.all_lights.value,
        led_brightness_values=[color for _ in range(10) for color in Colors.off.value]
    )

    # Delay to show LEDs change
    await asyncio.sleep(1)

    await rvr.set_all_leds(
        led_group=RvrLedGroups.headlight_left.value | RvrLedGroups.headlight_right.value,
        led_brightness_values=[
            0, 0, 255,
            255, 0, 0
        ]
    )

    # Delay to show LEDs change
    await asyncio.sleep(1)

    await rvr.close()


if __name__ == '__main__':
    try:
        loop.run_until_complete(
            main()
        )

    except KeyboardInterrupt:
        print('\nProgram terminated with keyboard interrupt.')

        loop.run_until_complete(
            rvr.close()
        )

    finally:
        if loop.is_running():
            loop.close()

LEDs - Set Multiple LEDs with Helper

import os
import sys
sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), '../../../')))

import asyncio
from sphero_sdk import SpheroRvrAsync
from sphero_sdk import Colors
from sphero_sdk import RvrLedGroups
from sphero_sdk import SerialAsyncDal


loop = asyncio.get_event_loop()

rvr = SpheroRvrAsync(
    dal=SerialAsyncDal(
        loop
    )
)


async def main():
    """ This program demonstrates how to set multiple LEDs on RVR using the LED control helper.
    """

    await rvr.wake()

    # Give RVR time to wake up
    await asyncio.sleep(2)

    await rvr.led_control.turn_leds_off()

    # Delay to show LEDs change
    await asyncio.sleep(1)

    await rvr.led_control.set_multiple_leds_with_enums(
        leds=[
            RvrLedGroups.headlight_left,
            RvrLedGroups.headlight_right
        ],
        colors=[
            Colors.green,
            Colors.blue
        ]
    )

    # Delay to show LEDs change
    await asyncio.sleep(1)

    await rvr.led_control.set_multiple_leds_with_rgb(
        leds=[
            RvrLedGroups.headlight_left,
            RvrLedGroups.headlight_right
        ],
        colors=[
            255, 0, 0,
            0, 255, 0
        ]
    )

    # Delay to show LEDs change
    await asyncio.sleep(1)

    await rvr.close()


if __name__ == '__main__':
    try:
        loop.run_until_complete(
            main()
        )

    except KeyboardInterrupt:
        print('\nProgram terminated with keyboard interrupt.')

        loop.run_until_complete(
            rvr.close()
        )

    finally:
        if loop.is_running():
            loop.close()

LEDs - Set Multiple LEDs with REST API

import os
import sys
sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), '../../../')))

import asyncio
from sphero_sdk import SpheroRvrAsync
from sphero_sdk import Colors
from sphero_sdk import RvrLedGroups
from sphero_sdk import RestfulAsyncDal


loop = asyncio.get_event_loop()

rvr = SpheroRvrAsync(
    dal=RestfulAsyncDal(
        domain='10.211.2.21',  # Add your raspberry-pi's IP address here
        port=2010  # The port opened by the npm server is always 2010
    )
)


async def main():
    """ This program demonstrates how to set multiple LEDs.
    """

    await rvr.wake()

    # Give RVR time to wake up
    await asyncio.sleep(2)

    await rvr.set_all_leds(
        led_group=RvrLedGroups.all_lights.value,
        led_brightness_values=[color for _ in range(10) for color in Colors.off.value]
    )

    # Delay to show LEDs change
    await asyncio.sleep(1)

    await rvr.set_all_leds(
        led_group=RvrLedGroups.headlight_left.value | RvrLedGroups.headlight_right.value,
        led_brightness_values=[
            0, 0, 255,
            255, 0, 0
        ]
    )

    # Delay to show LEDs change
    await asyncio.sleep(1)

    await rvr.close()


if __name__ == '__main__':
    try:
        loop.run_until_complete(
            main()
        )

    except KeyboardInterrupt:
        print('\nProgram terminated with keyboard interrupt.')

        loop.run_until_complete(
            rvr.close()
        )

    finally:
        if loop.is_running():
            loop.close()

LEDs - Set Single LED

import os
import sys
sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), '../../../')))

import asyncio
from sphero_sdk import SpheroRvrAsync
from sphero_sdk import Colors
from sphero_sdk import RvrLedGroups
from sphero_sdk import SerialAsyncDal


loop = asyncio.get_event_loop()

rvr = SpheroRvrAsync(
    dal=SerialAsyncDal(
        loop
    )
)


async def main():
    """ This program demonstrates how to set a single LED.
    """

    await rvr.wake()

    # Give RVR time to wake up
    await asyncio.sleep(2)

    await rvr.set_all_leds(
        led_group=RvrLedGroups.all_lights.value,
        led_brightness_values=[color for _ in range(10) for color in Colors.off.value]
    )

    # Delay to show LEDs change
    await asyncio.sleep(1)

    await rvr.set_all_leds(
        led_group=RvrLedGroups.headlight_right.value,   # 0xe00
        led_brightness_values=[255, 0, 0]
    )

    # Delay to show LEDs change
    await asyncio.sleep(1)

    await rvr.set_all_leds(
        led_group=RvrLedGroups.headlight_left.value,    # 0x1c0
        led_brightness_values=[0, 255, 0]
    )

    # Delay to show LEDs change
    await asyncio.sleep(1)

    await rvr.close()


if __name__ == '__main__':
    try:
        loop.run_until_complete(
            main()
        )

    except KeyboardInterrupt:
        print('\nProgram terminated with keyboard interrupt.')

        loop.run_until_complete(
            rvr.close()
        )

    finally:
        if loop.is_running():
            loop.close()

LEDs - Set Single LED with Helper

import os
import sys
sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), '../../../')))

import asyncio
from sphero_sdk import SpheroRvrAsync
from sphero_sdk import Colors
from sphero_sdk import RvrLedGroups
from sphero_sdk import SerialAsyncDal


loop = asyncio.get_event_loop()

rvr = SpheroRvrAsync(
    dal=SerialAsyncDal(
        loop
    )
)


async def main():
    """ This program demonstrates how to set a single LED on RVR using the LED control helper.
    """

    await rvr.wake()

    # Give RVR time to wake up
    await asyncio.sleep(2)

    await rvr.led_control.turn_leds_off()

    # Delay to show LEDs change
    await asyncio.sleep(1)

    await rvr.led_control.set_led_rgb(
        led=RvrLedGroups.headlight_right,
        red=255,
        green=0,
        blue=0
    )

    # Delay to show LEDs change
    await asyncio.sleep(1)

    await rvr.led_control.set_led_color(
        led=RvrLedGroups.headlight_left,
        color=Colors.green
    )

    # Delay to show LEDs change
    await asyncio.sleep(1)

    await rvr.close()


if __name__ == '__main__':
    try:
        loop.run_until_complete(
            main()
        )

    except KeyboardInterrupt:
        print('\nProgram terminated with keyboard interrupt.')

        loop.run_until_complete(
            rvr.close()
        )

    finally:
        if loop.is_running():
            loop.close()

Power - Get Battery State

import os
import sys
sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), '../../../')))

import asyncio
from sphero_sdk import SpheroRvrAsync
from sphero_sdk import SerialAsyncDal
from sphero_sdk import BatteryVoltageStatesEnum as VoltageStates


loop = asyncio.get_event_loop()

rvr = SpheroRvrAsync(
    dal=SerialAsyncDal(
        loop
    )
)


async def main():
    """ This program demonstrates how to retrieve the battery state of RVR.
    """

    await rvr.wake()

    # Give RVR time to wake up
    await asyncio.sleep(2)

    battery_percentage = await rvr.get_battery_percentage()
    print('Battery percentage: ', battery_percentage)

    battery_voltage_state = await rvr.get_battery_voltage_state()
    print('Voltage state: ', battery_voltage_state)

    state_info = '[{}, {}, {}, {}]'.format(
        '{}: {}'.format(VoltageStates.unknown.name, VoltageStates.unknown.value),
        '{}: {}'.format(VoltageStates.ok.name, VoltageStates.ok.value),
        '{}: {}'.format(VoltageStates.low.name, VoltageStates.low.value),
        '{}: {}'.format(VoltageStates.critical.name, VoltageStates.critical.value)
    )
    print('Voltage states: ', state_info)

    await rvr.close()


if __name__ == '__main__':
    try:
        loop.run_until_complete(
            main()
        )

    except KeyboardInterrupt:
        print('\nProgram terminated with keyboard interrupt.')

        loop.run_until_complete(
            rvr.close()
        )

    finally:
        if loop.is_running():
            loop.close()

Power - Set Battery State Notifications

import os
import sys
sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), '../../../')))

import asyncio
from sphero_sdk import SpheroRvrAsync
from sphero_sdk import SerialAsyncDal


loop = asyncio.get_event_loop()

rvr = SpheroRvrAsync(
    dal=SerialAsyncDal(
        loop
    )
)


async def battery_voltage_state_change_handler(battery_voltage_state):
    print('Battery voltage state: ', battery_voltage_state)


async def main():
    """ This program demonstrates how to enable battery state change notifications.
    """

    await rvr.wake()

    # Give RVR time to wake up
    await asyncio.sleep(2)

    await rvr.on_battery_voltage_state_change_notify(handler=battery_voltage_state_change_handler)
    await rvr.enable_battery_voltage_state_change_notify(is_enabled=True)

    print('Waiting for battery notification...')

    # The asyncio loop will run forever to give the aforementioned events time to occur


if __name__ == '__main__':
    try:
        asyncio.ensure_future(
            main()
        )
        loop.run_forever()

    except KeyboardInterrupt:
        print('\nProgram terminated with keyboard interrupt.')

        loop.run_until_complete(
            rvr.close()
        )

    finally:
        if loop.is_running():
            loop.close()

Power - Get Battery State with REST API

import os
import sys
sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), '../../../')))

import asyncio
from sphero_sdk import SpheroRvrAsync
from sphero_sdk import RestfulAsyncDal
from sphero_sdk import BatteryVoltageStatesEnum as VoltageStates


loop = asyncio.get_event_loop()

rvr = SpheroRvrAsync(
    dal=RestfulAsyncDal(
        domain='10.211.2.21',  # Add your raspberry-pi's IP address here
        port=2010  # The port opened by the npm server is always 2010
    )
)


async def main():
    """ This program demonstrates how to retrieve the battery state of RVR and print it to the console.
        Echo can be used to check to see if RVR is connected and awake.  In order to test it, a node.js
        server must be running on the raspberry-pi connected to RVR.  This code is meant to be executed
        from a separate computer.
    """

    await rvr.wake()

    # Give RVR time to wake up
    await asyncio.sleep(2)

    battery_percentage = await rvr.get_battery_percentage()
    print('Battery percentage: ', battery_percentage)

    battery_voltage_state = await rvr.get_battery_voltage_state()
    print('Voltage state: ', battery_voltage_state)

    state_info = '[{}, {}, {}, {}]'.format(
        '{}: {}'.format(VoltageStates.unknown.name, VoltageStates.unknown.value),
        '{}: {}'.format(VoltageStates.ok.name, VoltageStates.ok.value),
        '{}: {}'.format(VoltageStates.low.name, VoltageStates.low.value),
        '{}: {}'.format(VoltageStates.critical.name, VoltageStates.critical.value)
    )
    print('Voltage states: ', state_info)

    await rvr.close()


if __name__ == '__main__':
    try:
        loop.run_until_complete(
            main()
        )

    except KeyboardInterrupt:
        print('\nProgram terminated with keyboard interrupt.')

        loop.run_until_complete(
            rvr.close()
        )

    finally:
        if loop.is_running():
            loop.close()

Power - Sleep Monitoring

import os
import sys
sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), '../../../')))

import asyncio
from sphero_sdk import SpheroRvrAsync
from sphero_sdk import SerialAsyncDal


loop = asyncio.get_event_loop()

rvr = SpheroRvrAsync(
    dal=SerialAsyncDal(
        loop
    )
)


async def will_sleep_handler():
    print('RVR is about to sleep...')

    # here we could issue a command to RVR, e.g. wake() such that the sleep timer is reset


async def did_sleep_handler():
    print('RVR is asleep...')


async def main():
    """ This program demonstrates how to register handlers for a) the event received 10 seconds
        before RVR will sleep unless some new command is issued and b) the event received
        when RVR does go to sleep.

        Note that these notifications are received without the need to enable them on the robot.
    """

    await rvr.wake()

    # Give RVR time to wake up
    await asyncio.sleep(2)

    await rvr.on_will_sleep_notify(handler=will_sleep_handler)
    await rvr.on_did_sleep_notify(handler=did_sleep_handler)

    # The asyncio loop will run forever to give the aforementioned events time to occur


if __name__ == '__main__':
    try:
        asyncio.ensure_future(
            main()
        )
        loop.run_forever()

    except KeyboardInterrupt:
        print('\nProgram terminated with keyboard interrupt.')

        loop.run_until_complete(
            rvr.close()
        )

    finally:
        if loop.is_running():
            loop.close()

Sensor Streaming - Change Stream Settings

import os
import sys
sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), '../../../')))

import asyncio
from sphero_sdk import SpheroRvrAsync
from sphero_sdk import SerialAsyncDal
from sphero_sdk import RvrStreamingServices


loop = asyncio.get_event_loop()

rvr = SpheroRvrAsync(
    dal=SerialAsyncDal(
        loop
    )
)


async def imu_handler(imu_data):
    print('IMU data response: ', imu_data)


async def ambient_light_handler(ambient_light_data):
    print('Ambient data response:', ambient_light_data)


async def velocity_handler(velocity_data):
    print('Velocity data response:', velocity_data)


async def main():
    """ This program demonstrates how to update sensor streaming parameters at runawait asyncio.
    """

    await rvr.wake()

    # Give RVR time to wake up
    await asyncio.sleep(2)

    print('----------', 'Enabling IMU at 100 ms', '----------')

    await rvr.sensor_control.add_sensor_data_handler(
        service=RvrStreamingServices.imu,
        handler=imu_handler
    )
    await rvr.sensor_control.start(interval=100)

    # Delay to allow RVR to stream sensor data
    await asyncio.sleep(5)

    print('----------', 'Updating interval to 1000 ms', '----------')

    await rvr.sensor_control.stop()
    await rvr.sensor_control.start(interval=1000)

    # Delay to allow RVR to stream sensor data
    await asyncio.sleep(5)

    print('----------', 'Adding ambient light and velocity sensor streams', '----------')

    await rvr.sensor_control.stop()
    await rvr.sensor_control.add_sensor_data_handler(
        service=RvrStreamingServices.ambient_light,
        handler=ambient_light_handler
    )
    await rvr.sensor_control.add_sensor_data_handler(
        service=RvrStreamingServices.velocity,
        handler=velocity_handler
    )
    await rvr.sensor_control.start(interval=1000)

    # Delay to allow RVR to stream sensor data
    await asyncio.sleep(5)

    print('----------', 'Disabling IMU sensor stream and updating interval to 100 ms', '----------')

    await rvr.sensor_control.stop()
    await rvr.sensor_control.remove_sensor_data_handler(service=RvrStreamingServices.imu)
    await rvr.sensor_control.start(interval=100)

    # Delay to allow RVR to stream sensor data
    await asyncio.sleep(5)

    print('----------', 'Clearing all services', '----------')

    await rvr.sensor_control.clear()

    # Delay to allow RVR to stream sensor data
    await asyncio.sleep(1)

    await rvr.close()


if __name__ == '__main__':
    try:
        loop.run_until_complete(
            main()
        )

    except KeyboardInterrupt:
        print('\nProgram terminated with keyboard interrupt.')

        loop.run_until_complete(
            asyncio.gather(
                rvr.sensor_control.clear(),
                rvr.close()
            )
        )

    finally:
        if loop.is_running():
            loop.close()

Sensor Streaming - Change Stream Settings with REST API

import os
import sys
sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), '../../../')))

import asyncio
from sphero_sdk import SpheroRvrAsync
from sphero_sdk import RestfulAsyncDal
from sphero_sdk import RvrStreamingServices


loop = asyncio.get_event_loop()

rvr = SpheroRvrAsync(
    dal=RestfulAsyncDal(
        domain='10.211.2.21',  # Add your raspberry-pi's IP address here
        port=2010
    )
)


async def imu_handler(imu_data):
    print('IMU data response: ', imu_data)


async def ambient_light_handler(ambient_light_data):
    print('Ambient data response:', ambient_light_data)


async def velocity_handler(velocity_data):
    print('Velocity data response:', velocity_data)


async def main():
    """ This program has RVR drive around in different directions using the function raw_motors.

        Note:
            To give RVR time to drive, we call asyncio.sleep(...); if we did not have these calls, the program would
            go on and execute all the statements and exit without the driving ever taking place.
    """

    await rvr.wake()

    # Give RVR time to wake up
    await asyncio.sleep(2)

    print('----------', 'Enabling IMU at 100 ms', '----------')

    await rvr.sensor_control.add_sensor_data_handler(
        service=RvrStreamingServices.imu,
        handler=imu_handler
    )
    await rvr.sensor_control.start(interval=100)

    # Delay to allow RVR to stream sensor data
    await asyncio.sleep(5)

    print('----------', 'Updating interval to 1000 ms', '----------')

    await rvr.sensor_control.stop()
    await rvr.sensor_control.start(interval=1000)

    # Delay to allow RVR to stream sensor data
    await asyncio.sleep(5)

    print('----------', 'Adding ambient light and velocity sensor streams', '----------')

    await rvr.sensor_control.stop()
    await rvr.sensor_control.add_sensor_data_handler(
        service=RvrStreamingServices.ambient_light,
        handler=ambient_light_handler
    )
    await rvr.sensor_control.add_sensor_data_handler(
        service=RvrStreamingServices.velocity,
        handler=velocity_handler
    )
    await rvr.sensor_control.start(interval=1000)

    # Delay to allow RVR to stream sensor data
    await asyncio.sleep(5)

    print('----------', 'Disabling IMU sensor stream and updating interval to 100 ms', '----------')

    await rvr.sensor_control.stop()
    await rvr.sensor_control.remove_sensor_data_handler(service=RvrStreamingServices.imu)
    await rvr.sensor_control.start(interval=100)

    # Delay to allow RVR to stream sensor data
    await asyncio.sleep(5)

    print('----------', 'Clearing all services', '----------')

    await rvr.sensor_control.clear()

    # Delay to allow RVR to stream sensor data
    await asyncio.sleep(1)

    await rvr.close()


if __name__ == '__main__':
    try:
        loop.run_until_complete(
            main()
        )

    except KeyboardInterrupt:
        print('\nProgram terminated with keyboard interrupt.')

        loop.run_until_complete(
            asyncio.gather(
                rvr.sensor_control.clear(),
                rvr.close()
            )
        )

    finally:
        if loop.is_running():
            loop.close()

Sensor Streaming - Multi Sensor Stream

import os
import sys
sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), '../../../')))

import asyncio
from sphero_sdk import SpheroRvrAsync
from sphero_sdk import SerialAsyncDal
from sphero_sdk import RvrStreamingServices


loop = asyncio.get_event_loop()

rvr = SpheroRvrAsync(
    dal=SerialAsyncDal(
        loop
    )
)


async def imu_handler(imu_data):
    print('IMU data response: ', imu_data)


async def color_detected_handler(color_detected_data):
    print('Color detection data response: ', color_detected_data)


async def accelerometer_handler(accelerometer_data):
    print('Accelerometer data response: ', accelerometer_data)


async def ambient_light_handler(ambient_light_data):
    print('Ambient light data response: ', ambient_light_data)


async def main():
    """ This program demonstrates how to enable multiple sensors to stream.
    """

    await rvr.wake()

    # Give RVR time to wake up
    await asyncio.sleep(2)

    await rvr.sensor_control.add_sensor_data_handler(
        service=RvrStreamingServices.imu,
        handler=imu_handler
    )
    await rvr.sensor_control.add_sensor_data_handler(
        service=RvrStreamingServices.color_detection,
        handler=color_detected_handler
    )
    await rvr.sensor_control.add_sensor_data_handler(
        service=RvrStreamingServices.accelerometer,
        handler=accelerometer_handler
    )
    await rvr.sensor_control.add_sensor_data_handler(
        service=RvrStreamingServices.ambient_light,
        handler=ambient_light_handler
    )

    await rvr.sensor_control.start(interval=250)

    # The asyncio loop will run forever to allow infinite streaming.


if __name__ == '__main__':
    try:
        asyncio.ensure_future(
            main()
        )
        loop.run_forever()

    except KeyboardInterrupt:
        print('\nProgram terminated with keyboard interrupt.')

        loop.run_until_complete(
            asyncio.gather(
                rvr.sensor_control.clear(),
                rvr.close()
            )
        )

    finally:
        if loop.is_running():
            loop.close()

Sensor Streaming - Single Sensor Stream

import os
import sys
sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), '../../../')))

import asyncio
from sphero_sdk import SpheroRvrAsync
from sphero_sdk import SerialAsyncDal
from sphero_sdk import RvrStreamingServices


loop = asyncio.get_event_loop()

rvr = SpheroRvrAsync(
    dal=SerialAsyncDal(
        loop
    )
)


async def accelerometer_handler(accelerometer_data):
    print('Accelerometer data response: ', accelerometer_data)


async def main():
    """ This program demonstrates how to enable a single sensor to stream.
    """

    await rvr.wake()

    # Give RVR time to wake up
    await asyncio.sleep(2)

    await rvr.sensor_control.add_sensor_data_handler(
        service=RvrStreamingServices.accelerometer,
        handler=accelerometer_handler
    )

    await rvr.sensor_control.start(interval=250)

    # The asyncio loop will run forever to allow infinite streaming.


if __name__ == '__main__':
    try:
        asyncio.ensure_future(
            main()
        )
        loop.run_forever()

    except KeyboardInterrupt:
        print('\nProgram terminated with keyboard interrupt.')

        loop.run_until_complete(
            asyncio.gather(
                rvr.sensor_control.clear(),
                rvr.close()
            )
        )

    finally:
        if loop.is_running():
            loop.close()

System - Get Main App Version

import os
import sys
sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), '../../../')))

import asyncio
from sphero_sdk import SpheroRvrAsync
from sphero_sdk import SerialAsyncDal
from sphero_sdk import SpheroRvrTargets


loop = asyncio.get_event_loop()

rvr = SpheroRvrAsync(
    dal=SerialAsyncDal(
        loop
    )
)


async def main():
    """ This program demonstrates how to obtain the firmware version for a specific processor.
    """

    await rvr.wake()

    # Give RVR time to wake up
    await asyncio.sleep(2)

    nordic_main_application_version = await rvr.get_main_application_version(target=SpheroRvrTargets.primary.value)
    print('Nordic main application version (target 1): ', nordic_main_application_version)

    st_main_application_version = await rvr.get_main_application_version(target=SpheroRvrTargets.secondary.value)
    print('ST main application version (target 2): ', st_main_application_version)

    await rvr.close()


if __name__ == '__main__':
    try:
        loop.run_until_complete(
            main()
        )

    except KeyboardInterrupt:
        print('\nProgram terminated with keyboard interrupt.')

        loop.run_until_complete(
            rvr.close()
        )

    finally:
        if loop.is_running():
            loop.close()

System - Get Main App Version with REST API

import os
import sys
sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), '../../../')))

import asyncio
from sphero_sdk import SpheroRvrAsync
from sphero_sdk import SpheroRvrTargets
from sphero_sdk import RestfulAsyncDal


loop = asyncio.get_event_loop()

rvr = SpheroRvrAsync(
    dal=RestfulAsyncDal(
        domain='10.211.2.21',  # Add your raspberry-pi's IP address here
        port=2010  # The port opened by the npm server is always 2010
    )
)


async def main():
    """ This program demonstrates how to obtain the firmware version for a specific processor.
        In order to test it, a node.js server must be running on the raspberry-pi connected to RVR.
        This code is meant to be executed from a separate computer.
    """

    await rvr.wake()

    # Give RVR time to wake up
    await asyncio.sleep(2)

    nordic_main_application_version = await rvr.get_main_application_version(target=SpheroRvrTargets.primary.value)
    print('Nordic main application version (target 1): ', nordic_main_application_version)

    st_main_application_version = await rvr.get_main_application_version(target=SpheroRvrTargets.secondary.value)
    print('ST main application version (target 2): ', st_main_application_version)

    await rvr.close()


if __name__ == '__main__':
    try:
        loop.run_until_complete(
            main()
        )

    except KeyboardInterrupt:
        print('\nProgram terminated with keyboard interrupt.')

        loop.run_until_complete(
            rvr.close()
        )

    finally:
        if loop.is_running():
            loop.close()

Observer

API and Shell (Pinging)

import os
import sys
import time
sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), '../../../')))

from sphero_sdk import SpheroRvrObserver
from sphero_sdk import SpheroRvrTargets


rvr = SpheroRvrObserver()


def echo_handler(echo_response):
    print('Echo response: ', echo_response)


def main():
    """ This program demonstrates how to use the echo command, which sends data to RVR and RVR returns
        the same data. Echo can be used to check to see if RVR is connected and awake.
    """

    rvr.wake()

    # Give RVR time to wake up
    time.sleep(2)

    rvr.echo(
        data=[0, 1, 2],
        handler=echo_handler,
        target=SpheroRvrTargets.primary.value
    )

    # Give RVR time to respond
    time.sleep(1)

    rvr.echo(
        data=[3, 4, 5],
        handler=echo_handler,
        target=SpheroRvrTargets.secondary.value
    )

    # Sleep for one second such that RVR has time to send data back
    time.sleep(1)

    rvr.close()


if __name__ == '__main__':
    main()

Color Sensor (Color Detection)

import os
import sys
import time
sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), '../../../')))

from sphero_sdk import SpheroRvrObserver
from sphero_sdk import RvrStreamingServices


rvr = SpheroRvrObserver()


def color_detected_handler(color_detected_data):
    print('Color detection data response: ', color_detected_data)


def main():
    """ This program demonstrates how to use the color sensor on RVR (located on the down side of RVR, facing the floor)
        to report colors detected.
    """

    try:
        rvr.wake()

        # Give RVR time to wake up
        time.sleep(2)

        rvr.enable_color_detection(is_enabled=True)
        rvr.sensor_control.add_sensor_data_handler(
            service=RvrStreamingServices.color_detection,
            handler=color_detected_handler
        )
        rvr.sensor_control.start(interval=250)

        # Allow this program to run for 10 seconds
        time.sleep(10)

    except KeyboardInterrupt:
        print('\nProgram terminated with keyboard interrupt.')

    finally:
        rvr.sensor_control.clear()

        # Delay to allow RVR issue command before closing
        time.sleep(.5)
        
        rvr.close()


if __name__ == '__main__':
    main()

Drive - Raw Motors

import os
import sys
import time
sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), '../../../')))

from sphero_sdk import SpheroRvrObserver
from sphero_sdk import RawMotorModesEnum


rvr = SpheroRvrObserver()


def main():
    """ This program has RVR drive around in different directions.
    """

    try:
        rvr.wake()

        # Give RVR time to wake up
        time.sleep(2)

        rvr.reset_yaw()

        rvr.raw_motors(
            left_mode=RawMotorModesEnum.forward.value,
            left_speed=128,  # Valid speed values are 0-255
            right_mode=RawMotorModesEnum.forward.value,
            right_speed=128  # Valid speed values are 0-255
        )

        # Delay to allow RVR to drive
        time.sleep(1)

        rvr.raw_motors(
            left_mode=RawMotorModesEnum.reverse.value,
            left_speed=64,  # Valid speed values are 0-255
            right_mode=RawMotorModesEnum.reverse.value,
            right_speed=64  # Valid speed values are 0-255
        )

        # Delay to allow RVR to drive
        time.sleep(1)

        rvr.raw_motors(
            left_mode=RawMotorModesEnum.reverse.value,
            left_speed=128,  # Valid speed values are 0-255
            right_mode=RawMotorModesEnum.forward.value,
            right_speed=128  # Valid speed values are 0-255
        )

        # Delay to allow RVR to drive
        time.sleep(1)

        rvr.raw_motors(
            left_mode=RawMotorModesEnum.forward.value,
            left_speed=128,  # Valid speed values are 0-255
            right_mode=RawMotorModesEnum.forward.value,
            right_speed=128  # Valid speed values are 0-255
        )

        # Delay to allow RVR to drive
        time.sleep(1)

        rvr.raw_motors(
            left_mode=RawMotorModesEnum.off.value,
            left_speed=0,  # Valid speed values are 0-255
            right_mode=RawMotorModesEnum.off.value,
            right_speed=0  # Valid speed values are 0-255
        )

        # Delay to allow RVR to drive
        time.sleep(1)

    except KeyboardInterrupt:
        print('\nProgram terminated with keyboard interrupt.')

    finally:
        rvr.close()


if __name__ == '__main__':
    main()

Drive with Heading

import os
import sys
import time
sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), '../../../')))

from sphero_sdk import SpheroRvrObserver
from sphero_sdk import DriveFlagsBitmask


rvr = SpheroRvrObserver()


def main():
    """ This program has RVR drive around in different directions using the function drive_with_heading.
    """

    try:
        rvr.wake()

        # Give RVR time to wake up
        time.sleep(2)

        rvr.reset_yaw()

        rvr.drive_with_heading(
            speed=128,  # Valid speed values are 0-255
            heading=0,  # Valid heading values are 0-359
            flags=DriveFlagsBitmask.none.value
        )

        # Delay to allow RVR to drive
        time.sleep(1)

        rvr.drive_with_heading(
            speed=128,  # Valid speed values are 0-255
            heading=0,  # Valid heading values are 0-359
            flags=DriveFlagsBitmask.drive_reverse.value
        )

        # Delay to allow RVR to drive
        time.sleep(1)

        rvr.drive_with_heading(
            speed=128,  # Valid speed values are 0-255
            heading=90,  # Valid heading values are 0-359
            flags=DriveFlagsBitmask.none.value
        )

        # Delay to allow RVR to drive
        time.sleep(1)

        rvr.drive_with_heading(
            speed=128,  # Valid speed values are 0-255
            heading=270,  # Valid heading values are 0-359
            flags=DriveFlagsBitmask.none.value
        )

        # Delay to allow RVR to drive
        time.sleep(1)

        rvr.drive_with_heading(
            speed=0,  # Valid heading values are 0-359
            heading=0,  # Valid heading values are 0-359
            flags=DriveFlagsBitmask.none.value
        )

        # Delay to allow RVR to drive
        time.sleep(1)

    except KeyboardInterrupt:
        print('\nProgram terminated with keyboard interrupt.')

    finally:
        rvr.close()


if __name__ == '__main__':
    main()

Drive with Heading - Reverse Mode

import os
import sys
import time
sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), '../../../')))

from sphero_sdk import SpheroRvrObserver
from sphero_sdk import DriveFlagsBitmask


rvr = SpheroRvrObserver()


def main():
    """ This program has RVR drive around in different directions using the function drive_with_heading.
    """

    try:
        rvr.wake()

        # Give RVR time to wake up
        time.sleep(2)

        rvr.reset_yaw()

        rvr.drive_with_heading(
            speed=128,  # Valid speed values are 0-255
            heading=90,  # Valid heading values are 0-359
            flags=DriveFlagsBitmask.drive_reverse.value
        )

        # Delay to allow RVR to drive
        time.sleep(1)

        rvr.drive_with_heading(
            speed=0,  # Valid heading values are 0-359
            heading=0,  # Valid heading values are 0-359
            flags=DriveFlagsBitmask.none.value
        )

        # Delay to allow RVR to drive
        time.sleep(1)

    except KeyboardInterrupt:
        print('\nProgram terminated with keyboard interrupt.')

    finally:
        rvr.close()


if __name__ == '__main__':
    main()

Drive with Helper

import os
import sys
import time
sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), '../../../')))

from sphero_sdk import SpheroRvrObserver


rvr = SpheroRvrObserver()


def main():
    """ This program has RVR drive with how to drive RVR using the drive control helper.
    """

    try:
        rvr.wake()

        # Give RVR time to wake up
        time.sleep(2)

        rvr.drive_control.reset_heading()

        rvr.drive_control.drive_forward_seconds(
            speed=64,
            heading=0,  # Valid heading values are 0-359
            time_to_drive=1
        )

        # Delay to allow RVR to drive
        time.sleep(1)

        rvr.drive_control.drive_backward_seconds(
            speed=64,
            heading=0,  # Valid heading values are 0-359
            time_to_drive=1
        )

        # Delay to allow RVR to drive
        time.sleep(1)

        rvr.drive_control.turn_left_degrees(
            heading=0,  # Valid heading values are 0-359
            amount=90
        )

        # Delay to allow RVR to drive
        time.sleep(1)

    except KeyboardInterrupt:
        print('\nProgram terminated with keyboard interrupt.')

    finally:
        rvr.close()


if __name__ == '__main__':
    main()

Drive with Helper - Roll

import os
import sys
import time
sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), '../../../')))

from sphero_sdk import SpheroRvrObserver


rvr = SpheroRvrObserver()


def main():
    """ This program has RVR drive with how to drive RVR using the drive control helper.
    """

    try:
        rvr.wake()

        # Give RVR time to wake up
        time.sleep(2)

        rvr.drive_control.reset_heading()

        rvr.drive_control.roll_start(
            speed=64,
            heading=90
        )

        # Delay to allow RVR to drive
        time.sleep(1)

        rvr.drive_control.roll_stop(heading=270)

        # Delay to allow RVR to drive
        time.sleep(1)

    except KeyboardInterrupt:
        print('\nProgram terminated with keyboard interrupt.')

    finally:
        rvr.close()


if __name__ == '__main__':
    main()

Infrared - Broadcast IR

import os
import sys
import time
sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), '../../../')))

from sphero_sdk import SpheroRvrObserver
from sphero_sdk import InfraredCodes
from sphero_sdk import RawMotorModesEnum


rvr = SpheroRvrObserver()


def main():
    """ This program sets up RVR to communicate with another robot, e.g. BOLT, capable of infrared communication.
    """

    try:
        rvr.wake()

        # Give RVR time to wake up
        time.sleep(2)

        rvr.start_robot_to_robot_infrared_broadcasting(
            far_code=InfraredCodes.one.value,
            near_code=InfraredCodes.zero.value
        )

        for i in range(2):
            rvr.raw_motors(
                left_mode=RawMotorModesEnum.forward.value,
                left_speed=64,  # Valid speed values are 0-255
                right_mode=RawMotorModesEnum.forward.value,
                right_speed=64  # Valid speed values are 0-255
            )

            # Delay to allow RVR to drive
            time.sleep(2)

    except:
        print('\nProgram terminated with keyboard interrupt.')

    finally:
        rvr.stop_robot_to_robot_infrared_broadcasting()

        # Delay to allow RVR issue command before closing
        time.sleep(.5)

        rvr.close()


if __name__ == '__main__':
    main()

Infrared - Broadcast IR with Helper

import os
import sys
import time
sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), '../../../')))

from sphero_sdk import SpheroRvrObserver
from sphero_sdk import InfraredCodes
from sphero_sdk import RawMotorModesEnum


rvr = SpheroRvrObserver()


def main():
    """ This program sets up RVR to communicate with another robot, e.g. BOLT, capable of infrared communication.
    """

    try:
        rvr.wake()

        # Give RVR time to wake up
        time.sleep(2)

        rvr.infrared_control.start_robot_to_robot_infrared_broadcasting(
            far_code=InfraredCodes.one.value,
            near_code=InfraredCodes.zero.value
        )

        rvr.raw_motors(
            left_mode=RawMotorModesEnum.forward.value,
            left_speed=64,  # Valid speed values are 0-255
            right_mode=RawMotorModesEnum.forward.value,
            right_speed=64  # Valid speed values are 0-255
        )

        # Delay to allow RVR to drive
        time.sleep(4)

    except:
        print('\nProgram terminated with keyboard interrupt.')

    finally:
        rvr.stop_robot_to_robot_infrared_broadcasting()

        # Delay to allow RVR issue command before closing
        time.sleep(.5)

        rvr.close()


if __name__ == '__main__':
    main()

Infrared - Listen For and Send IR

import os
import sys
import time
sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), '../../../')))

from sphero_sdk import SpheroRvrObserver


rvr = SpheroRvrObserver()


def infrared_message_received_handler(infrared_message):
    print('Infrared message response: ', infrared_message)


def main():
    """ This program sets up RVR to communicate with another robot, e.g. BOLT, capable of infrared communication.
    """

    try:
        rvr.wake()

        # Give RVR time to wake up
        time.sleep(2)

        rvr.on_robot_to_robot_infrared_message_received_notify(handler=infrared_message_received_handler)

        rvr.enable_robot_infrared_message_notify(is_enabled=True)

        infrared_code = 3
        strength = 64

        for _ in range(20):
            rvr.send_infrared_message(
                infrared_code=infrared_code,
                front_strength=strength,
                left_strength=strength,
                right_strength=strength,
                rear_strength=strength
            )

            print('Infrared message sent with code: {0}'.format(infrared_code))

            time.sleep(2)

    except KeyboardInterrupt:
        print('\nProgram terminated with keyboard interrupt.')

    finally:
        rvr.stop_robot_to_robot_infrared_broadcasting()

        # Delay to allow RVR issue command before closing
        time.sleep(.5)
        
        rvr.close()


if __name__ == '__main__':
    main()

Infrared - Listen For and Send IR with Helper

import os
import sys
import time
sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), '../../../')))

from sphero_sdk import SpheroRvrObserver
from sphero_sdk import InfraredCodes


rvr = SpheroRvrObserver()


def infrared_message_received_handler(infrared_message):
    print('Infrared message response: ', infrared_message)


def main():
    """ This program sets up RVR to communicate with another robot, e.g. BOLT, capable of infrared communication.
    """

    try:
        rvr.wake()

        # Give RVR time to wake up
        time.sleep(2)

        rvr.infrared_control.listen_for_infrared_message(handler=infrared_message_received_handler)

        codes = [
            InfraredCodes.zero,
            InfraredCodes.one,
            InfraredCodes.two,
            InfraredCodes.three
        ]

        for _ in range(20):
            rvr.infrared_control.send_infrared_messages(
                messages=codes,
                strength=64
            )

            print('Infrared message sent with codes: {0}'.format([code.value for code in codes]))

            time.sleep(2)

    except KeyboardInterrupt:
        print('\nProgram terminated with keyboard interrupt.')

    finally:
        rvr.stop_robot_to_robot_infrared_broadcasting()

        # Delay to allow RVR issue command before closing
        time.sleep(.5)
        
        rvr.close()


if __name__ == '__main__':
    main()

LEDs - Set All LEDs

import os
import sys
import time
sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), '../../../')))

from sphero_sdk import SpheroRvrObserver
from sphero_sdk import Colors
from sphero_sdk import RvrLedGroups


rvr = SpheroRvrObserver()


def main():
    """ This program demonstrates how to set the all the LEDs.
    """

    try:
        rvr.wake()

        # Give RVR time to wake up
        time.sleep(2)

        rvr.set_all_leds(
            led_group=RvrLedGroups.all_lights.value,
            led_brightness_values=[color for _ in range(10) for color in Colors.off.value]
        )

        # Delay to show LEDs change
        time.sleep(1)

        rvr.set_all_leds(
            led_group=RvrLedGroups.all_lights.value,
            led_brightness_values=[color for _ in range(10) for color in [255, 0, 0]]
        )

        # Delay to show LEDs change
        time.sleep(1)

    except KeyboardInterrupt:
        print('\nProgram terminated with keyboard interrupt.')

    finally:
        rvr.close()


if __name__ == '__main__':
    main()

LEDs - Set All LEDs with Helper

import os
import sys
import time
sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), '../../../')))

from sphero_sdk import SpheroRvrObserver
from sphero_sdk import Colors


rvr = SpheroRvrObserver()


def main():
    """ This program demonstrates how to set the all the LEDs of RVR using the LED control helper.
    """

    try:
        rvr.wake()

        # Give RVR time to wake up
        time.sleep(2)

        rvr.led_control.turn_leds_off()

        # Delay to show LEDs change
        time.sleep(1)

        rvr.led_control.set_all_leds_color(color=Colors.yellow)

        # Delay to show LEDs change
        time.sleep(1)

        rvr.led_control.turn_leds_off()

        # Delay to show LEDs change
        time.sleep(1)

        rvr.led_control.set_all_leds_rgb(red=255, green=144, blue=0)

        # Delay to show LEDs change
        time.sleep(1)

    except KeyboardInterrupt:
        print('\nProgram terminated with keyboard interrupt.')

    finally:
        rvr.close()

if __name__ == '__main__':
    main()

LEDs - Set Multiple LEDs

import os
import sys
import time
sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), '../../../')))

from sphero_sdk import SpheroRvrObserver
from sphero_sdk import Colors
from sphero_sdk import RvrLedGroups


rvr = SpheroRvrObserver()


def main():
    """ This program demonstrates how to set multiple LEDs.
    """

    try:
        rvr.wake()

        # Give RVR time to wake up
        time.sleep(2)

        rvr.set_all_leds(
            led_group=RvrLedGroups.all_lights.value,
            led_brightness_values=[color for _ in range(10) for color in Colors.off.value]
        )

        # Delay to show LEDs change
        time.sleep(1)

        rvr.set_all_leds(
            led_group=RvrLedGroups.headlight_left.value | RvrLedGroups.headlight_right.value,
            led_brightness_values=[
                255, 0, 0,
                0, 0, 255
            ]
        )

        # Delay to show LEDs change
        time.sleep(1)

    except KeyboardInterrupt:
        print('\nProgram terminated with keyboard interrupt.')

    finally:
        rvr.close()


if __name__ == '__main__':
    main()

LEDs - Set Multiple LEDs with Helper

import os
import sys
import time
sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), '../../../')))

from sphero_sdk import SpheroRvrObserver
from sphero_sdk import Colors
from sphero_sdk import RvrLedGroups


rvr = SpheroRvrObserver()


def main():
    """ This program demonstrates how to set multiple LEDs on RVR using the LED control helper.
    """

    try:
        rvr.wake()

        # Give RVR time to wake up
        time.sleep(2)

        rvr.led_control.turn_leds_off()

        # Delay to show LEDs change
        time.sleep(1)

        rvr.led_control.set_multiple_leds_with_enums(
            leds=[
                RvrLedGroups.headlight_left,
                RvrLedGroups.headlight_right
            ],
            colors=[
                Colors.green,
                Colors.blue
            ]
        )

        # Delay to show LEDs change
        time.sleep(1)

        rvr.led_control.set_multiple_leds_with_rgb(
            leds=[
                RvrLedGroups.headlight_left,
                RvrLedGroups.headlight_right
            ],
            colors=[
                255, 0, 0,
                0, 255, 0
            ]
        )

        # Delay to show LEDs change
        time.sleep(1)

    except KeyboardInterrupt:
        print('\nProgram terminated with keyboard interrupt.')

    finally:
        rvr.close()


if __name__ == '__main__':
    main()

LEDs - Set Single LED

import os
import sys
import time
sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), '../../../')))

from sphero_sdk import SpheroRvrObserver
from sphero_sdk import Colors
from sphero_sdk import RvrLedGroups


rvr = SpheroRvrObserver()


def main():
    """ This program demonstrates how to set a single LED.
    """

    try:
        rvr.wake()

        # Give RVR time to wake up
        time.sleep(2)

        rvr.set_all_leds(
            led_group=RvrLedGroups.all_lights.value,
            led_brightness_values=[color for _ in range(10) for color in Colors.off.value]
        )

        # Delay to show LEDs change
        time.sleep(1)

        rvr.set_all_leds(
            led_group=RvrLedGroups.headlight_right.value,   # 0xe00
            led_brightness_values=[255, 0, 0]
        )

        # Delay to show LEDs change
        time.sleep(1)

        rvr.set_all_leds(
            led_group=RvrLedGroups.headlight_left.value,      # 0x1c0
            led_brightness_values=[0, 255, 0]
        )

        # Delay to show LEDs change
        time.sleep(1)

    except KeyboardInterrupt:
        print('\nProgram terminated with keyboard interrupt.')

    finally:
        rvr.close()


if __name__ == '__main__':
    main()

LEDs - Set Single LED with Helper

import os
import sys
import time
sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), '../../../')))

from sphero_sdk import SpheroRvrObserver
from sphero_sdk import Colors
from sphero_sdk import RvrLedGroups


rvr = SpheroRvrObserver()


def main():
    """ This program demonstrates how to set a single LED on RVR using the LED control helper.
    """

    try:
        rvr.wake()

        # Give RVR time to wake up
        time.sleep(2)

        rvr.led_control.turn_leds_off()

        # Delay to show LEDs change
        time.sleep(1)

        rvr.led_control.set_led_rgb(
            led=RvrLedGroups.headlight_right,
            red=255,
            green=0,
            blue=0
        )

        # Delay to show LEDs change
        time.sleep(1)

        rvr.led_control.set_led_color(
            led=RvrLedGroups.headlight_left,
            color=Colors.green
        )

        # Delay to show LEDs change
        time.sleep(1)

    except KeyboardInterrupt:
        print('\nProgram terminated with keyboard interrupt.')

    finally:
        rvr.close()


if __name__ == '__main__':
    main()

Power - Get Battery State

import os
import sys
import time
sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), '../../../')))

from sphero_sdk import SpheroRvrObserver
from sphero_sdk import BatteryVoltageStatesEnum as VoltageStates


rvr = SpheroRvrObserver()


def battery_percentage_handler(battery_percentage):
    print('Battery percentage: ', battery_percentage)


def battery_voltage_handler(battery_voltage_state):
    print('Voltage state: ', battery_voltage_state)

    state_info = '[{}, {}, {}, {}]'.format(
        '{}: {}'.format(VoltageStates.unknown.name, VoltageStates.unknown.value),
        '{}: {}'.format(VoltageStates.ok.name, VoltageStates.ok.value),
        '{}: {}'.format(VoltageStates.low.name, VoltageStates.low.value),
        '{}: {}'.format(VoltageStates.critical.name, VoltageStates.critical.value)
    )
    print('Voltage states: ', state_info)


def main():
    """ This program demonstrates how to retrieve the battery state of RVR.
    """

    try:
        rvr.wake()

        # Give RVR time to wake up
        time.sleep(2)

        rvr.get_battery_percentage(handler=battery_percentage_handler)

        # Sleep for one second such that RVR has time to send data back
        time.sleep(1)

        rvr.get_battery_voltage_state(handler=battery_voltage_handler)

        # Sleep for one second such that RVR has time to send data back
        time.sleep(1)

    except KeyboardInterrupt:
        print('\nProgram terminated with keyboard interrupt.')

    finally:
        rvr.close()


if __name__ == '__main__':
    main()

Power - Get Battery State Notifications

import os
import sys
import time
sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), '../../../')))

from sphero_sdk import SpheroRvrObserver


rvr = SpheroRvrObserver()


def battery_voltage_state_change_handler(battery_voltage_state):
    print('Battery voltage state: ', battery_voltage_state)


def main():
    """ This program demonstrates how to enable battery state change notifications.
    """

    try:
        rvr.wake()

        # Give RVR time to wake up
        time.sleep(2)

        rvr.on_battery_voltage_state_change_notify(handler=battery_voltage_state_change_handler)
        rvr.enable_battery_voltage_state_change_notify(is_enabled=True)

    except KeyboardInterrupt:
        print('\nProgram terminated with keyboard interrupt.')

    finally:
        rvr.close()


if __name__ == '__main__':
    main()

Power - Sleep Monitoring

import os
import sys
import time
sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), '../../../')))

from sphero_sdk import SpheroRvrObserver


rvr = SpheroRvrObserver()


def will_sleep_handler():
    print('RVR is about to sleep...')

    # here we could issue a command to RVR, e.g. wake() such that the sleep timer is reset


def did_sleep_handler():
    print('RVR is asleep...')


def main():
    """ This program demonstrates how to register handlers for a) the event received 10 seconds
        before RVR will sleep unless some new command is issued and b) the event received
        when RVR does go to sleep.

        Note that these notifications are received without the need to enable them on the robot.
    """

    try:
        rvr.wake()

        # Give RVR time to wake up
        time.sleep(2)

        rvr.on_will_sleep_notify(will_sleep_handler)

        rvr.on_did_sleep_notify(did_sleep_handler)

        # Sleep for 310 seconds such that we see the aforementioned events have time to occur
        time.sleep(310)

    except KeyboardInterrupt:
        print('\nProgram terminated with keyboard interrupt.')

    finally:
        rvr.close()


if __name__ == '__main__':
    main()

Sensor Streaming - Change Stream Settings

import os
import sys
import time
sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), '../../../')))

from sphero_sdk import SpheroRvrObserver
from sphero_sdk import RvrStreamingServices


rvr = SpheroRvrObserver()


def imu_handler(imu_data):
    print('IMU data response: ', imu_data)


def ambient_light_handler(ambient_light_data):
    print('Ambient data response:', ambient_light_data)


def velocity_handler(velocity_data):
    print('Velocity data response:', velocity_data)


def main():
    """ This program demonstrates how to update sensor streaming parameters at runtime.
    """

    try:
        rvr.wake()

        # Give RVR time to wake up
        time.sleep(2)

        print('----------', 'Enabling IMU at 100 ms', '----------')

        rvr.sensor_control.add_sensor_data_handler(
            service=RvrStreamingServices.imu,
            handler=imu_handler
        )
        rvr.sensor_control.start(interval=100)

        # Delay to allow RVR to stream sensor data
        time.sleep(5)

        print('----------', 'Updating interval to 1000 ms', '----------')

        rvr.sensor_control.stop()
        rvr.sensor_control.start(interval=1000)

        # Delay to allow RVR to stream sensor data
        time.sleep(5)

        print('----------', 'Adding ambient light and velocity sensor streams', '----------')

        rvr.sensor_control.stop()
        rvr.sensor_control.add_sensor_data_handler(
            service=RvrStreamingServices.ambient_light,
            handler=ambient_light_handler
        )
        rvr.sensor_control.add_sensor_data_handler(
            service=RvrStreamingServices.velocity,
            handler=velocity_handler
        )
        rvr.sensor_control.start(interval=1000)

        # Delay to allow RVR to stream sensor data
        time.sleep(5)

        print('----------', 'Disabling IMU sensor stream and updating interval to 100 ms', '----------')

        rvr.sensor_control.stop()
        rvr.sensor_control.remove_sensor_data_handler(service=RvrStreamingServices.imu)
        rvr.sensor_control.start(interval=100)

        # Delay to allow RVR to stream sensor data
        time.sleep(5)

        print('----------', 'Clearing all services', '----------')

        rvr.sensor_control.clear()

        # Delay to allow RVR to stream sensor data
        time.sleep(1)

    except KeyboardInterrupt:
        print('\nProgram terminated with keyboard interrupt.')

    finally:
        if len(rvr.sensor_control.enabled_sensors) > 0:
            rvr.sensor_control.clear()

        # Delay to allow RVR issue command before closing
        time.sleep(.5)

        rvr.close()


if __name__ == '__main__':
    main()

Sensor Streaming - Multi Sensor Stream

import os
import sys
import time
sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), '../../../')))

from sphero_sdk import SpheroRvrObserver
from sphero_sdk import RvrStreamingServices


rvr = SpheroRvrObserver()


def imu_handler(imu_data):
    print('IMU data response: ', imu_data)


def color_detected_handler(color_detected_data):
    print('Color detection data response: ', color_detected_data)


def accelerometer_handler(accelerometer_data):
    print('Accelerometer data response: ', accelerometer_data)


def ambient_light_handler(ambient_light_data):
    print('Ambient light data response: ', ambient_light_data)


def main():
    """ This program demonstrates how to enable multiple sensors to stream.
    """

    try:
        rvr.wake()

        # Give RVR time to wake up
        time.sleep(2)

        rvr.sensor_control.add_sensor_data_handler(
            service=RvrStreamingServices.imu,
            handler=imu_handler
        )
        rvr.sensor_control.add_sensor_data_handler(
            service=RvrStreamingServices.color_detection,
            handler=color_detected_handler
        )
        rvr.sensor_control.add_sensor_data_handler(
            service=RvrStreamingServices.accelerometer,
            handler=accelerometer_handler
        )
        rvr.sensor_control.add_sensor_data_handler(
            service=RvrStreamingServices.ambient_light,
            handler=ambient_light_handler
        )

        rvr.sensor_control.start(interval=250)

        while True:
            # Delay to allow RVR to stream sensor data
            time.sleep(1)

    except KeyboardInterrupt:
        print('\nProgram terminated with keyboard interrupt.')

    finally:
        rvr.sensor_control.clear()

        # Delay to allow RVR issue command before closing
        time.sleep(.5)
        
        rvr.close()


if __name__ == '__main__':
    main()

Sensor Streaming - Single Sensor Stream

import os
import sys
import time
sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), '../../../')))

from sphero_sdk import SpheroRvrObserver
from sphero_sdk import RvrStreamingServices


rvr = SpheroRvrObserver()


def accelerometer_handler(accelerometer_data):
    print('Accelerometer data response: ', accelerometer_data)


def main():
    """ This program demonstrates how to enable a single sensor to stream.
    """

    try:
        rvr.wake()

        # Give RVR time to wake up
        time.sleep(2)

        rvr.sensor_control.add_sensor_data_handler(
            service=RvrStreamingServices.accelerometer,
            handler=accelerometer_handler
        )

        rvr.sensor_control.start(interval=250)

        while True:
            # Delay to allow RVR to stream sensor data
            time.sleep(1)

    except KeyboardInterrupt:
        print('\nProgram terminated with keyboard interrupt.')

    finally:
        rvr.sensor_control.clear()

        # Delay to allow RVR issue command before closing
        time.sleep(.5)
        
        rvr.close()


if __name__ == '__main__':
    main()

System (Get Main App Version)

import os
import sys
import time
sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), '../../../')))

from sphero_sdk import SpheroRvrObserver
from sphero_sdk import SpheroRvrTargets


rvr = SpheroRvrObserver()


def get_nordic_main_application_version_handler(nordic_main_application_version):
    print('Nordic main application version (target 1): ', nordic_main_application_version)


def get_st_main_application_version_handler(st_main_application_version):
    print('ST main application version (target 2): ', st_main_application_version)


def main():
    """ This program demonstrates how to obtain the firmware version for a specific processor.
    """

    try:
        rvr.wake()

        # Give RVR time to wake up
        time.sleep(2)

        rvr.get_main_application_version(
            handler=get_nordic_main_application_version_handler,
            target=SpheroRvrTargets.primary.value
        )

        # Sleep for one second such that RVR has time to send data back
        time.sleep(1)

        rvr.get_main_application_version(
            handler=get_st_main_application_version_handler,
            target=SpheroRvrTargets.secondary.value
        )

        # Sleep for one second such that RVR has time to send data back
        time.sleep(1)

    except KeyboardInterrupt:
        print('\nProgram terminated with keyboard interrupt.')

    finally:
        rvr.close()


if __name__ == '__main__':
    main()
  • Asynchronous
    • API and Shell (Pinging)
    • API and Shell (Pinging) with REST API
    • Color Sensor (Color Detection)
    • Color Sensor (Color Detection) with REST API
    • Drive - Raw Motors
    • Drive with Heading
    • Drive with Heading - Reverse Mode
    • Drive with Helper
    • Drive with Helper - Roll
    • Drive with REST API
    • Infrared - Broadcast IR
    • Infrared - Bradcast IR with Helper
    • Infrared - Listen For and Send IR
    • Infrared - Listen For and Send IR with Helper
    • Infrared - Listen For and Send IR with REST API
    • LEDs - Set All LEDs
    • LEDs - Set All LEDs with Helper
    • LEDs - Set Multiple LEDs
    • LEDs - Set Multiple LEDs with Helper
    • LEDs - Set Multiple LEDs with REST API
    • LEDs - Set Single LED
    • LEDs - Set Single LED with Helper
    • Power - Get Battery State
    • Power - Set Battery State Notifications
    • Power - Get Battery State with REST API
    • Power - Sleep Monitoring
    • Sensor Streaming - Change Stream Settings
    • Sensor Streaming - Change Stream Settings with REST API
    • Sensor Streaming - Multi Sensor Stream
    • Sensor Streaming - Single Sensor Stream
    • System - Get Main App Version
    • System - Get Main App Version with REST API
  • Observer
    • API and Shell (Pinging)
    • Color Sensor (Color Detection)
    • Drive - Raw Motors
    • Drive with Heading
    • Drive with Heading - Reverse Mode
    • Drive with Helper
    • Drive with Helper - Roll
    • Infrared - Broadcast IR
    • Infrared - Broadcast IR with Helper
    • Infrared - Listen For and Send IR
    • Infrared - Listen For and Send IR with Helper
    • LEDs - Set All LEDs
    • LEDs - Set All LEDs with Helper
    • LEDs - Set Multiple LEDs
    • LEDs - Set Multiple LEDs with Helper
    • LEDs - Set Single LED
    • LEDs - Set Single LED with Helper
    • Power - Get Battery State
    • Power - Get Battery State Notifications
    • Power - Sleep Monitoring
    • Sensor Streaming - Change Stream Settings
    • Sensor Streaming - Multi Sensor Stream
    • Sensor Streaming - Single Sensor Stream
    • System (Get Main App Version)
Sphero Public SDK

Wanna stay informed about Public SDK efforts and releases?

Sphero
Public SDK LicenseAbout Sphero Public SDKSphero.comSphero EDU
Community
Sphero CommunityThingiverseTwitterInstagramFacebook
More
RedditLinkedInGitHubStar
Copyright © 2022 Sphero