How To Use the Raspberry Pi Python SDK

Where to get the code

Pull down the newest Python SDK from the GitHub repo!

You can follow our Getting Started with Raspberry Pi instructions for more details.

Asynchronous

API and Shell (Pinging)

import os

import sys

sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), '../../../')))


import asyncio

from sphero_sdk import SpheroRvrAsync

from sphero_sdk import SerialAsyncDal

from sphero_sdk import SpheroRvrTargets



loop = asyncio.get_event_loop()


rvr = SpheroRvrAsync(

    dal=SerialAsyncDal(

        loop

    )

)



async def main():

    """ This program demonstrates how to use the echo command, which sends data to RVR and RVR returns

        the same data. Echo can be used to check to see if RVR is connected and awake.

    """


    await rvr.wake()


    # Give RVR time to wake up

    await asyncio.sleep(2)


    echo_response = await rvr.echo(

        data=[0, 1, 2],

        target=SpheroRvrTargets.primary.value

    )

    print('Echo response 1: ', echo_response)


    echo_response = await rvr.echo(

        data=[4, 5, 6],

        target=SpheroRvrTargets.secondary.value

    )

    print('Echo response 2: ', echo_response)


    await rvr.close()



if __name__ == '__main__':

    try:

        loop.run_until_complete(

            main()

        )


    except KeyboardInterrupt:

        print('\nProgram terminated with keyboard interrupt.')


        loop.run_until_complete(

            rvr.close()

        )


    finally:

        if loop.is_running():

            loop.close()


API and Shell (Pinging) with REST API

import os

import sys

sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), '../../../')))


import asyncio

from sphero_sdk import SpheroRvrAsync

from sphero_sdk import RestfulAsyncDal

from sphero_sdk import SpheroRvrTargets



loop = asyncio.get_event_loop()


rvr = SpheroRvrAsync(

    dal=RestfulAsyncDal(

        domain='10.211.2.21'# Add your raspberry-pi's IP address here

        port=2010  # The port opened by the npm server is always 2010

    )

)



async def main():

    """ This program demonstrates how to use the echo command, which sends data to RVR and has RVR returns

        the same data. Echo can be used to check to see if RVR is connected and awake.  In order to test it,

        a node.js server must be running on the raspberry-pi connected to RVR.  This code is meant to be

        executed from a separate computer.

    """


    await rvr.wake()


    # Give RVR time to wake up

    await asyncio.sleep(2)


    echo_response = await rvr.echo(

        data=[0, 1, 2],

        target=SpheroRvrTargets.primary.value

    )

    print('Echo response 1: ', echo_response)


    echo_response = await rvr.echo(

        data=[4, 5, 6],

        target=SpheroRvrTargets.secondary.value

    )

    print('Echo response 2: ', echo_response)


    await rvr.close()



if __name__ == '__main__':

    try:

        loop.run_until_complete(

            main()

        )


    except KeyboardInterrupt:

        print('\nProgram terminated with keyboard interrupt.')


        loop.run_until_complete(

            rvr.close()

        )


    finally:

        if loop.is_running():

            loop.close()


Color Sensor (Color Detection)

import os

import sys

sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), '../../../')))


import asyncio

from sphero_sdk import SpheroRvrAsync

from sphero_sdk import SerialAsyncDal

from sphero_sdk import RvrStreamingServices



loop = asyncio.get_event_loop()


rvr = SpheroRvrAsync(

    dal=SerialAsyncDal(

        loop

    )

)



async def color_detected_handler(color_detected_data):

    print('Color detection data response: ', color_detected_data)



async def main():

    """ This program demonstrates how to use the color sensor on RVR (located on the down side of RVR, facing the floor)

        to report colors detected. To exit program, press <CTRL-C>

    """


    await rvr.wake()


    # Give RVR time to wake up

    await asyncio.sleep(2)


    await rvr.enable_color_detection(is_enabled=True)

    await rvr.sensor_control.add_sensor_data_handler(

        service=RvrStreamingServices.color_detection,

        handler=color_detected_handler

    )

    await rvr.sensor_control.start(interval=250)


    while True:

        await asyncio.sleep(1)



if __name__ == '__main__':

    try:

        asyncio.ensure_future(

            main()

        )

        loop.run_forever()


    except KeyboardInterrupt:

        print('\nProgram terminated with keyboard interrupt.')


        loop.run_until_complete(

            asyncio.gather(

                rvr.enable_color_detection(is_enabled=False),

                rvr.close()

            )

        )


    finally:

        if loop.is_running():

            loop.close()


Color Sensor (Color Detection) with REST API

import os

import sys

sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), '../../../')))


import asyncio

from sphero_sdk import SpheroRvrAsync

from sphero_sdk import RestfulAsyncDal

from sphero_sdk import RvrStreamingServices



loop = asyncio.get_event_loop()


rvr = SpheroRvrAsync(

    dal=RestfulAsyncDal(

        domain='10.211.2.21'# Add your raspberry-pi's IP address here

        port=2010  # The port opened by the npm server is always 2010

    )

)



async def color_detected_handler(color_detected_data):

    print('Color detection data response: ', color_detected_data)



async def main():

    """ This program uses the color sensor on RVR (located on the down side of RVR, facing the floor) to report colors detected.

        To exit program, press <CTRL-C>

    """


    await rvr.wake()


    # Give RVR time to wake up

    await asyncio.sleep(2)


    await rvr.enable_color_detection(is_enabled=True)

    await rvr.sensor_control.add_sensor_data_handler(

        service=RvrStreamingServices.color_detection,

        handler=color_detected_handler

    )

    await rvr.sensor_control.start(interval=250)


    while True:

        await asyncio.sleep(1)



if __name__ == '__main__':

    try:

        asyncio.ensure_future(

            main()

        )

        loop.run_forever()


    except KeyboardInterrupt:

        print('\nProgram terminated with keyboard interrupt.')


        loop.run_until_complete(

            asyncio.gather(

                rvr.enable_color_detection(is_enabled=False),

                rvr.close()

            )

        )


    finally:

        if loop.is_running():

            loop.close()

Drive - Raw Motors

import os

import sys

sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), '../../../')))


import asyncio

from sphero_sdk import SpheroRvrAsync

from sphero_sdk import SerialAsyncDal

from sphero_sdk import RawMotorModesEnum



loop = asyncio.get_event_loop()


rvr = SpheroRvrAsync(

    dal=SerialAsyncDal(

        loop

    )

)



async def main():

    """ This program has RVR drive around in different directions.

    """


    await rvr.wake()


    # Give RVR time to wake up

    await asyncio.sleep(2)


    await rvr.reset_yaw()


    await rvr.raw_motors(

        left_mode=RawMotorModesEnum.forward.value,

        left_speed=128# Valid speed values are 0-255

        right_mode=RawMotorModesEnum.forward.value,

        right_speed=128  # Valid speed values are 0-255

    )


    # Delay to allow RVR to drive

    await asyncio.sleep(1)


    await rvr.raw_motors(

        left_mode=RawMotorModesEnum.reverse.value,

        left_speed=64# Valid speed values are 0-255

        right_mode=RawMotorModesEnum.reverse.value,

        right_speed=64  # Valid speed values are 0-255

    )


    # Delay to allow RVR to drive

    await asyncio.sleep(1)


    await rvr.raw_motors(

        left_mode=RawMotorModesEnum.reverse.value,

        left_speed=128# Valid speed values are 0-255

        right_mode=RawMotorModesEnum.forward.value,

        right_speed=128  # Valid speed values are 0-255

    )


    # Delay to allow RVR to drive

    await asyncio.sleep(1)


    await rvr.raw_motors(

        left_mode=RawMotorModesEnum.forward.value,

        left_speed=128# Valid speed values are 0-255

        right_mode=RawMotorModesEnum.forward.value,

        right_speed=128  # Valid speed values are 0-255

    )


    # Delay to allow RVR to drive

    await asyncio.sleep(1)


    await rvr.raw_motors(

        left_mode=RawMotorModesEnum.off.value,

        left_speed=0# Valid speed values are 0-255

        right_mode=RawMotorModesEnum.off.value,

        right_speed=0  # Valid speed values are 0-255

    )


    # Delay to allow RVR to drive

    await asyncio.sleep(1)


    await rvr.close()



if __name__ == '__main__':

    try:

        loop.run_until_complete(

            main()

        )


    except KeyboardInterrupt:

        print('\nProgram terminated with keyboard interrupt.')


        loop.run_until_complete(

            rvr.close()

        )


    finally:

        if loop.is_running():

            loop.close()


Drive with Heading

import os

import sys

sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), '../../../')))


import asyncio

from sphero_sdk import SpheroRvrAsync

from sphero_sdk import SerialAsyncDal

from sphero_sdk import DriveFlagsBitmask



loop = asyncio.get_event_loop()


rvr = SpheroRvrAsync(

    dal=SerialAsyncDal(

        loop

    )

)



async def main():

    """ This program has RVR drive around in different directions using the function drive_with_heading.

    """


    await rvr.wake()


    # Give RVR time to wake up

    await asyncio.sleep(2)


    await rvr.reset_yaw()


    await rvr.drive_with_heading(

        speed=128# Valid speed values are 0-255

        heading=0# Valid heading values are 0-359

        flags=DriveFlagsBitmask.none.value

    )


    # Delay to allow RVR to drive

    await asyncio.sleep(1)


    await rvr.drive_with_heading(

        speed=128# Valid speed values are 0-255

        heading=0# Valid heading values are 0-359

        flags=DriveFlagsBitmask.drive_reverse.value

    )


    # Delay to allow RVR to drive

    await asyncio.sleep(1)


    await rvr.drive_with_heading(

        speed=128# Valid speed values are 0-255

        heading=90# Valid heading values are 0-359

        flags=DriveFlagsBitmask.none.value

    )


    # Delay to allow RVR to drive

    await asyncio.sleep(1)


    await rvr.drive_with_heading(

        speed=128# Valid speed values are 0-255

        heading=270# Valid heading values are 0-359

        flags=DriveFlagsBitmask.none.value

    )


    # Delay to allow RVR to drive

    await asyncio.sleep(1)


    await rvr.drive_with_heading(

        speed=0# Valid heading values are 0-359

        heading=0# Valid heading values are 0-359

        flags=DriveFlagsBitmask.none.value

    )


    # Delay to allow RVR to drive

    await asyncio.sleep(1)


    await rvr.close()



if __name__ == '__main__':

    try:

        loop.run_until_complete(

            main()

        )


    except KeyboardInterrupt:

        print('\nProgram terminated with keyboard interrupt.')


        loop.run_until_complete(

            rvr.close()

        )


    finally:

        if loop.is_running():

            loop.close()


Drive with Heading - Reverse Mode

import os

import sys

sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), '../../../')))


import asyncio

from sphero_sdk import SpheroRvrAsync

from sphero_sdk import SerialAsyncDal

from sphero_sdk import DriveFlagsBitmask



loop = asyncio.get_event_loop()


rvr = SpheroRvrAsync(

    dal=SerialAsyncDal(

        loop

    )

)



async def main():

    """ This program has RVR drive around in different directions using the function drive_with_heading.

    """


    await rvr.wake()


    # Give RVR time to wake up

    await asyncio.sleep(2)


    await rvr.reset_yaw()


    await rvr.drive_with_heading(

        speed=128# Valid speed values are 0-255

        heading=90# Valid heading values are 0-359

        flags=DriveFlagsBitmask.drive_reverse.value

    )


    # Delay to allow RVR to drive

    await asyncio.sleep(1)


    await rvr.drive_with_heading(

        speed=0# Valid heading values are 0-359

        heading=0# Valid heading values are 0-359

        flags=DriveFlagsBitmask.none.value

    )


    # Delay to allow RVR to drive

    await asyncio.sleep(1)


    await rvr.close()



if __name__ == '__main__':

    try:

        loop.run_until_complete(

            main()

        )


    except KeyboardInterrupt:

        print('\nProgram terminated with keyboard interrupt.')


        loop.run_until_complete(

            rvr.close()

        )


    finally:

        if loop.is_running():

            loop.close()


Drive with Helper

import os

import sys

sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), '../../../')))


import asyncio

from sphero_sdk import SpheroRvrAsync

from sphero_sdk import SerialAsyncDal



loop = asyncio.get_event_loop()


rvr = SpheroRvrAsync(

    dal=SerialAsyncDal(

        loop

    )

)



async def main():

    """ This program has RVR drive with how to drive RVR using the drive control helper.

    """


    await rvr.wake()


    # Give RVR time to wake up

    await asyncio.sleep(2)


    await rvr.drive_control.reset_heading()


    await rvr.drive_control.drive_forward_seconds(

        speed=64,

        heading=0# Valid heading values are 0-359

        time_to_drive=1

    )


    # Delay to allow RVR to drive

    await asyncio.sleep(1)


    await rvr.drive_control.drive_backward_seconds(

        speed=64,

        heading=0# Valid heading values are 0-359

        time_to_drive=1

    )


    # Delay to allow RVR to drive

    await asyncio.sleep(1)


    await rvr.drive_control.turn_left_degrees(

        heading=0# Valid heading values are 0-359

        amount=90

    )


    # Delay to allow RVR to drive

    await asyncio.sleep(1)


    await rvr.close()



if __name__ == '__main__':

    try:

        loop.run_until_complete(

            main()

        )


    except KeyboardInterrupt:

        print('\nProgram terminated with keyboard interrupt.')


        loop.run_until_complete(

            rvr.close()

        )


    finally:

        if loop.is_running():

            loop.close()


Drive with Helper - Roll

import os

import sys

sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), '../../../')))


import asyncio

from sphero_sdk import SpheroRvrAsync

from sphero_sdk import SerialAsyncDal



loop = asyncio.get_event_loop()


rvr = SpheroRvrAsync(

    dal=SerialAsyncDal(

        loop

    )

)



async def main():

    """ This program has RVR drive with how to drive RVR using the drive control helper.

    """


    await rvr.wake()


    # Give RVR time to wake up

    await asyncio.sleep(2)


    await rvr.drive_control.reset_heading()


    await rvr.drive_control.roll_start(

        speed=64,

        heading=90

    )


    # Delay to allow RVR to drive

    await asyncio.sleep(1)


    await rvr.drive_control.roll_stop(heading=270)


    # Delay to allow RVR to drive

    await asyncio.sleep(1)


    await rvr.close()



if __name__ == '__main__':

    try:

        loop.run_until_complete(

            main()

        )


    except KeyboardInterrupt:

        print('\nProgram terminated with keyboard interrupt.')


        loop.run_until_complete(

            rvr.close()

        )


    finally:

        if loop.is_running():

            loop.close()


Drive with REST API

import os

import sys

sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), '../../../')))


import asyncio

from sphero_sdk import SpheroRvrAsync

from sphero_sdk import RestfulAsyncDal

from sphero_sdk import DriveFlagsBitmask



loop = asyncio.get_event_loop()


rvr = SpheroRvrAsync(

    dal=RestfulAsyncDal(

        domain='10.211.2.21'# Add your raspberry-pi's IP address here

        port=2010  # The port opened by the npm server is always 2010

    )

)



async def main():

    """ This program has RVR drive using the Rest API.  In order to test it, a node.js server must be running on the

        raspberry-pi connected to RVR.  This code is meant to be executed from a separate computer.


        Note:

            To give RVR time to drive, we call asyncio.sleep(...); if we did not have these calls, the program would

            go on and execute all the statements and exit without the driving ever taking place.

    """


    await rvr.wake()


    # Give RVR time to wake up

    await asyncio.sleep(2)


    await rvr.reset_yaw()


    await rvr.drive_with_heading(

        speed=128# Valid speed values are 0-255

        heading=0# Valid heading values are 0-359

        flags=DriveFlagsBitmask.none.value

    )


    # Delay to allow RVR to drive

    await asyncio.sleep(1)


    await rvr.drive_with_heading(

        speed=128# Valid speed values are 0-255

        heading=0# Valid heading values are 0-359

        flags=DriveFlagsBitmask.drive_reverse.value

    )


    # Delay to allow RVR to drive

    await asyncio.sleep(1)


    await rvr.drive_with_heading(

        speed=128# Valid speed values are 0-255

        heading=90# Valid heading values are 0-359

        flags=DriveFlagsBitmask.none.value

    )


    # Delay to allow RVR to drive

    await asyncio.sleep(1)


    await rvr.drive_with_heading(

        speed=128# Valid speed values are 0-255

        heading=270# Valid heading values are 0-359

        flags=DriveFlagsBitmask.none.value

    )


    # Delay to allow RVR to drive

    await asyncio.sleep(1)


    await rvr.drive_with_heading(

        speed=0# Valid heading values are 0-359

        heading=0# Valid heading values are 0-359

        flags=DriveFlagsBitmask.none.value

    )


    # Delay to allow RVR to drive

    await asyncio.sleep(1)


    await rvr.close()



if __name__ == '__main__':

    try:

        loop.run_until_complete(

            main()

        )


    except KeyboardInterrupt:

        print('\nProgram terminated with keyboard interrupt.')


        loop.run_until_complete(

            rvr.close()

        )


    finally:

        if loop.is_running():

            loop.close()

Infrared - Broadcast IR

import os

import sys

sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), '../../../')))


import asyncio

from sphero_sdk import SpheroRvrAsync

from sphero_sdk import SerialAsyncDal

from sphero_sdk import InfraredCodes

from sphero_sdk import RawMotorModesEnum



loop = asyncio.get_event_loop()


rvr = SpheroRvrAsync(

    dal=SerialAsyncDal(

        loop

    )

)



async def main():

    """ This program sets up RVR to communicate with another robot, e.g. BOLT, capable of infrared communication.

    """


    await rvr.wake()


    # Give RVR time to wake up

    await asyncio.sleep(2)


    await rvr.start_robot_to_robot_infrared_broadcasting(

        far_code=InfraredCodes.one.value,

        near_code=InfraredCodes.zero.value

    )


    for i in range(2):

        await rvr.raw_motors(

            left_mode=RawMotorModesEnum.forward.value,

            left_speed=64# Valid speed values are 0-255

            right_mode=RawMotorModesEnum.forward.value,

            right_speed=64  # Valid speed values are 0-255

        )


        # Delay to allow RVR to drive

        await asyncio.sleep(2)


    await rvr.stop_robot_to_robot_infrared_broadcasting()


    await rvr.close()



if __name__ == '__main__':

    try:

        loop.run_until_complete(

            main()

        )


    except KeyboardInterrupt:

        print('\nProgram terminated with keyboard interrupt.')


        loop.run_until_complete(

            asyncio.gather(

                rvr.stop_robot_to_robot_infrared_broadcasting(),

                rvr.close()

            )

        )


    finally:

        if loop.is_running():

            loop.close()


Infrared - Broadcast IR with Helper

import os

import sys

sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), '../../../')))


import asyncio

from sphero_sdk import SpheroRvrAsync

from sphero_sdk import SerialAsyncDal

from sphero_sdk import InfraredCodes

from sphero_sdk import RawMotorModesEnum



loop = asyncio.get_event_loop()


rvr = SpheroRvrAsync(

    dal=SerialAsyncDal(

        loop

    )

)



async def main():

    """ This program sets up RVR to communicate with another robot, e.g. BOLT, capable of infrared communication.

    """


    await rvr.wake()


    # Give RVR time to wake up

    await asyncio.sleep(2)


    await rvr.infrared_control.start_infrared_broadcasting(

        far_code=InfraredCodes.one,

        near_code=InfraredCodes.zero

    )


    await rvr.raw_motors(

        left_mode=RawMotorModesEnum.forward.value,

        left_speed=64# Valid speed values are 0-255

        right_mode=RawMotorModesEnum.forward.value,

        right_speed=64  # Valid speed values are 0-255

    )


    # Delay to allow RVR to drive

    await asyncio.sleep(4)


    await rvr.infrared_control.stop_infrared_broadcasting()


    await rvr.close()



if __name__ == '__main__':

    try:

        loop.run_until_complete(

            main()

        )


    except KeyboardInterrupt:

        print('\nProgram terminated with keyboard interrupt.')


        loop.run_until_complete(

            asyncio.gather(

                rvr.stop_robot_to_robot_infrared_broadcasting(),

                rvr.close()

            )

        )


    finally:

        if loop.is_running():

            loop.close()


Infrared - Listen For and Send IR

import os

import sys

sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), '../../../')))


import asyncio

from sphero_sdk import SpheroRvrAsync

from sphero_sdk import SerialAsyncDal



loop = asyncio.get_event_loop()


rvr = SpheroRvrAsync(

    dal=SerialAsyncDal(

        loop

    )

)



async def infrared_message_received_handler(infrared_message):

    print('Infrared message response: ', infrared_message)



async def main():

    """ This program sets up RVR to communicate with another robot, e.g. BOLT, capable of infrared communication.

    """


    await rvr.wake()


    # Give RVR time to wake up

    await asyncio.sleep(2)


    await rvr.on_robot_to_robot_infrared_message_received_notify(handler=infrared_message_received_handler)


    await rvr.enable_robot_infrared_message_notify(is_enabled=True)


    infrared_code = 3

    strength = 64


    while True:

        await rvr.send_infrared_message(

            infrared_code=infrared_code,

            front_strength=strength,

            left_strength=strength,

            right_strength=strength,

            rear_strength=strength

        )


        print('Infrared message sent with code: {0}'.format(infrared_code))


        await asyncio.sleep(2)



if __name__ == '__main__':

    try:

        asyncio.ensure_future(

            main()

        )

        loop.run_forever()


    except KeyboardInterrupt:

        print('\nProgram terminated with keyboard interrupt.')


        loop.run_until_complete(

            asyncio.gather(

                rvr.stop_robot_to_robot_infrared_broadcasting(),

                rvr.close()

            )

        )


    finally:

        if loop.is_running():

            loop.close()


Infrared - Listen For and Send IR with Helper

import os

import sys

sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), '../../../')))


import asyncio

from sphero_sdk import SpheroRvrAsync

from sphero_sdk import SerialAsyncDal

from sphero_sdk import InfraredCodes



loop = asyncio.get_event_loop()


rvr = SpheroRvrAsync(

    dal=SerialAsyncDal(

        loop

    )

)



async def infrared_message_received_handler(infrared_message):

    print('Infrared message response: ', infrared_message)



async def main():

    """ This program sets up RVR to communicate with another robot, e.g. BOLT, capable of infrared communication.

    """


    await rvr.wake()


    # Give RVR time to wake up

    await asyncio.sleep(2)


    await rvr.infrared_control.listen_for_infrared_message(handler=infrared_message_received_handler)


    codes = [

        InfraredCodes.zero,

        InfraredCodes.one,

        InfraredCodes.two,

        InfraredCodes.three

    ]


    while True:

        await rvr.infrared_control.send_infrared_messages(

            messages=codes,

            strength=64

        )


        print('Infrared message sent with codes: {0}'.format([code.value for code in codes]))


        await asyncio.sleep(2)



if __name__ == '__main__':

    try:

        loop.run_until_complete(

            main()

        )


    except KeyboardInterrupt:

        print('\nProgram terminated with keyboard interrupt.')


        loop.run_until_complete(

            asyncio.gather(

                rvr.stop_robot_to_robot_infrared_broadcasting(),

                rvr.close()

            )

        )


    finally:

        if loop.is_running():

            loop.close()


Infrared - Listen For and Send IR with REST API

import os

import sys

sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), '../../../')))


import asyncio

from sphero_sdk import SpheroRvrAsync

from sphero_sdk import RestfulAsyncDal



loop = asyncio.get_event_loop()


rvr = SpheroRvrAsync(

    dal=RestfulAsyncDal(

        domain='10.211.2.21'# Add your raspberry-pi's IP address here

        port=2010  # The port opened by the npm server is always 2010

    )

)



async def infrared_message_received_handler(response):

    print('Response data for IR message received:',response)



async def main():

    """ This program sets up RVR to communicate with another robot, e.g. BOLT, capable of infrared communication.


        To try this out, write a script for your other robot that a) broadcasts on the corresponding

        channel that RVR is set to listen to [in this case channel 0] and b) listens on the channel which

        RVR sends messages on [in this case channel 3]

    """


    await rvr.wake()


    # Give RVR time to wake up

    await asyncio.sleep(2)


    await rvr.on_robot_to_robot_infrared_message_received_notify(handler=infrared_message_received_handler)


    await rvr.enable_robot_infrared_message_notify(is_enabled=True)


    infrared_code = 3

    strength = 64


    while True:

        await rvr.send_infrared_message(

            infrared_code=infrared_code,

            front_strength=strength,

            left_strength=strength,

            right_strength=strength,

            rear_strength=strength

        )


        print('Infrared message sent with code: {0}'.format(infrared_code))


        await asyncio.sleep(2)



if __name__ == '__main__':

    try:

        asyncio.ensure_future(

            main()

        )

        loop.run_forever()


    except KeyboardInterrupt:

        print('\nProgram terminated with keyboard interrupt.')


        loop.run_until_complete(

            asyncio.gather(

                rvr.stop_robot_to_robot_infrared_broadcasting(),

                rvr.close()

            )

        )


    finally:

        if loop.is_running():

            loop.close()


LEDs - Set All LEDs

import os

import sys

sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), '../../../')))


import asyncio

from sphero_sdk import SpheroRvrAsync

from sphero_sdk import Colors

from sphero_sdk import RvrLedGroups

from sphero_sdk import SerialAsyncDal



loop = asyncio.get_event_loop()


rvr = SpheroRvrAsync(

    dal=SerialAsyncDal(

        loop

    )

)



async def main():

    """ This program demonstrates how to set the all the LEDs.

    """


    await rvr.wake()


    # Give RVR time to wake up

    await asyncio.sleep(2)


    await rvr.set_all_leds(

        led_group=RvrLedGroups.all_lights.value,

        led_brightness_values=[color for _ in range(10) for color in Colors.off.value]

    )


    # Delay to show LEDs change

    await asyncio.sleep(1)


    await rvr.set_all_leds(

        led_group=RvrLedGroups.all_lights.value,

        led_brightness_values=[color for x in range(10) for color in [0, 255, 0]]

    )


    # Delay to show LEDs change

    await asyncio.sleep(1)


    await rvr.close()



if __name__ == '__main__':

    try:

        loop.run_until_complete(

            main()

        )


    except KeyboardInterrupt:

        print('\nProgram terminated with keyboard interrupt.')


        loop.run_until_complete(

            rvr.close()

        )


    finally:

        if loop.is_running():

            loop.close()


LEDs - Set All LEDs with Helper

import os

import sys

sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), '../../../')))


import asyncio

from sphero_sdk import SpheroRvrAsync

from sphero_sdk import Colors

from sphero_sdk import SerialAsyncDal



loop = asyncio.get_event_loop()


rvr = SpheroRvrAsync(

    dal=SerialAsyncDal(

        loop

    )

)



async def main():

    """ This program demonstrates how to set the all the LEDs of RVR using the LED control helper.

    """


    await rvr.wake()


    # Give RVR time to wake up

    await asyncio.sleep(2)


    await rvr.led_control.turn_leds_off()


    # Delay to show LEDs change

    await asyncio.sleep(1)


    await rvr.led_control.set_all_leds_color(color=Colors.yellow)


    # Delay to show LEDs change

    await asyncio.sleep(1)


    await rvr.led_control.turn_leds_off()


    # Delay to show LEDs change

    await asyncio.sleep(1)


    await rvr.led_control.set_all_leds_rgb(red=255, green=144, blue=0)


    # Delay to show LEDs change

    await asyncio.sleep(1)


    await rvr.close()



if __name__ == '__main__':

    try:

        loop.run_until_complete(

            main()

        )


    except KeyboardInterrupt:

        print('\nProgram terminated with keyboard interrupt.')


        loop.run_until_complete(

            rvr.close()

        )


    finally:

        if loop.is_running():

            loop.close()


LEDs - Set Multiple LEDs

import os

import sys

sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), '../../../')))


import asyncio

from sphero_sdk import SpheroRvrAsync

from sphero_sdk import Colors

from sphero_sdk import RvrLedGroups

from sphero_sdk import SerialAsyncDal



loop = asyncio.get_event_loop()


rvr = SpheroRvrAsync(

    dal=SerialAsyncDal(

        loop

    )

)



async def main():

    """ This program demonstrates how to set multiple LEDs.

    """


    await rvr.wake()


    # Give RVR time to wake up

    await asyncio.sleep(2)


    await rvr.set_all_leds(

        led_group=RvrLedGroups.all_lights.value,

        led_brightness_values=[color for _ in range(10) for color in Colors.off.value]

    )


    # Delay to show LEDs change

    await asyncio.sleep(1)


    await rvr.set_all_leds(

        led_group=RvrLedGroups.headlight_left.value | RvrLedGroups.headlight_right.value,

        led_brightness_values=[

            0, 0, 255,

            255, 0, 0

        ]

    )


    # Delay to show LEDs change

    await asyncio.sleep(1)


    await rvr.close()



if __name__ == '__main__':

    try:

        loop.run_until_complete(

            main()

        )


    except KeyboardInterrupt:

        print('\nProgram terminated with keyboard interrupt.')


        loop.run_until_complete(

            rvr.close()

        )


    finally:

        if loop.is_running():

            loop.close()


LEDs - Set Multiple LEDs with Helper

import os

import sys

sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), '../../../')))


import asyncio

from sphero_sdk import SpheroRvrAsync

from sphero_sdk import Colors

from sphero_sdk import RvrLedGroups

from sphero_sdk import SerialAsyncDal



loop = asyncio.get_event_loop()


rvr = SpheroRvrAsync(

    dal=SerialAsyncDal(

        loop

    )

)



async def main():

    """ This program demonstrates how to set multiple LEDs on RVR using the LED control helper.

    """


    await rvr.wake()


    # Give RVR time to wake up

    await asyncio.sleep(2)


    await rvr.led_control.turn_leds_off()


    # Delay to show LEDs change

    await asyncio.sleep(1)


    await rvr.led_control.set_multiple_leds_with_enums(

        leds=[

            RvrLedGroups.headlight_left,

            RvrLedGroups.headlight_right

        ],

        colors=[

            Colors.green,

            Colors.blue

        ]

    )


    # Delay to show LEDs change

    await asyncio.sleep(1)


    await rvr.led_control.set_multiple_leds_with_rgb(

        leds=[

            RvrLedGroups.headlight_left,

            RvrLedGroups.headlight_right

        ],

        colors=[

            255, 0, 0,

            0, 255, 0

        ]

    )


    # Delay to show LEDs change

    await asyncio.sleep(1)


    await rvr.close()



if __name__ == '__main__':

    try:

        loop.run_until_complete(

            main()

        )


    except KeyboardInterrupt:

        print('\nProgram terminated with keyboard interrupt.')


        loop.run_until_complete(

            rvr.close()

        )


    finally:

        if loop.is_running():

            loop.close()


LEDs - Set Multiple LEDs with REST API

import os

import sys

sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), '../../../')))


import asyncio

from sphero_sdk import SpheroRvrAsync

from sphero_sdk import Colors

from sphero_sdk import RvrLedGroups

from sphero_sdk import RestfulAsyncDal



loop = asyncio.get_event_loop()


rvr = SpheroRvrAsync(

    dal=RestfulAsyncDal(

        domain='10.211.2.21'# Add your raspberry-pi's IP address here

        port=2010  # The port opened by the npm server is always 2010

    )

)



async def main():

    """ This program demonstrates how to set multiple LEDs.

    """


    await rvr.wake()


    # Give RVR time to wake up

    await asyncio.sleep(2)


    await rvr.set_all_leds(

        led_group=RvrLedGroups.all_lights.value,

        led_brightness_values=[color for _ in range(10) for color in Colors.off.value]

    )


    # Delay to show LEDs change

    await asyncio.sleep(1)


    await rvr.set_all_leds(

        led_group=RvrLedGroups.headlight_left.value | RvrLedGroups.headlight_right.value,

        led_brightness_values=[

            0, 0, 255,

            255, 0, 0

        ]

    )


    # Delay to show LEDs change

    await asyncio.sleep(1)


    await rvr.close()



if __name__ == '__main__':

    try:

        loop.run_until_complete(

            main()

        )


    except KeyboardInterrupt:

        print('\nProgram terminated with keyboard interrupt.')


        loop.run_until_complete(

            rvr.close()

        )


    finally:

        if loop.is_running():

            loop.close()


LEDs - Set Single LED

import os

import sys

sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), '../../../')))


import asyncio

from sphero_sdk import SpheroRvrAsync

from sphero_sdk import Colors

from sphero_sdk import RvrLedGroups

from sphero_sdk import SerialAsyncDal



loop = asyncio.get_event_loop()


rvr = SpheroRvrAsync(

    dal=SerialAsyncDal(

        loop

    )

)



async def main():

    """ This program demonstrates how to set a single LED.

    """


    await rvr.wake()


    # Give RVR time to wake up

    await asyncio.sleep(2)


    await rvr.set_all_leds(

        led_group=RvrLedGroups.all_lights.value,

        led_brightness_values=[color for _ in range(10) for color in Colors.off.value]

    )


    # Delay to show LEDs change

    await asyncio.sleep(1)


    await rvr.set_all_leds(

        led_group=RvrLedGroups.headlight_right.value,   # 0xe00

        led_brightness_values=[255, 0, 0]

    )


    # Delay to show LEDs change

    await asyncio.sleep(1)


    await rvr.set_all_leds(

        led_group=RvrLedGroups.headlight_left.value,    # 0x1c0

        led_brightness_values=[0, 255, 0]

    )


    # Delay to show LEDs change

    await asyncio.sleep(1)


    await rvr.close()



if __name__ == '__main__':

    try:

        loop.run_until_complete(

            main()

        )


    except KeyboardInterrupt:

        print('\nProgram terminated with keyboard interrupt.')


        loop.run_until_complete(

            rvr.close()

        )


    finally:

        if loop.is_running():

            loop.close()


LEDs - Set Single LED with Helper

import os

import sys

sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), '../../../')))


import asyncio

from sphero_sdk import SpheroRvrAsync

from sphero_sdk import Colors

from sphero_sdk import RvrLedGroups

from sphero_sdk import SerialAsyncDal



loop = asyncio.get_event_loop()


rvr = SpheroRvrAsync(

    dal=SerialAsyncDal(

        loop

    )

)



async def main():

    """ This program demonstrates how to set a single LED on RVR using the LED control helper.

    """


    await rvr.wake()


    # Give RVR time to wake up

    await asyncio.sleep(2)


    await rvr.led_control.turn_leds_off()


    # Delay to show LEDs change

    await asyncio.sleep(1)


    await rvr.led_control.set_led_rgb(

        led=RvrLedGroups.headlight_right,

        red=255,

        green=0,

        blue=0

    )


    # Delay to show LEDs change

    await asyncio.sleep(1)


    await rvr.led_control.set_led_color(

        led=RvrLedGroups.headlight_left,

        color=Colors.green

    )


    # Delay to show LEDs change

    await asyncio.sleep(1)


    await rvr.close()



if __name__ == '__main__':

    try:

        loop.run_until_complete(

            main()

        )


    except KeyboardInterrupt:

        print('\nProgram terminated with keyboard interrupt.')


        loop.run_until_complete(

            rvr.close()

        )


    finally:

        if loop.is_running():

           loop.close()



Power - Get Battery State

import os

import sys

sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), '../../../')))


import asyncio

from sphero_sdk import SpheroRvrAsync

from sphero_sdk import SerialAsyncDal

from sphero_sdk import BatteryVoltageStatesEnum as VoltageStates



loop = asyncio.get_event_loop()


rvr = SpheroRvrAsync(

    dal=SerialAsyncDal(

        loop

    )

)



async def main():

    """ This program demonstrates how to retrieve the battery state of RVR.

    """


    await rvr.wake()


    # Give RVR time to wake up

    await asyncio.sleep(2)


    battery_percentage = await rvr.get_battery_percentage()

    print('Battery percentage: ', battery_percentage)


    battery_voltage_state = await rvr.get_battery_voltage_state()

    print('Voltage state: ', battery_voltage_state)


    state_info = '[{}, {}, {}, {}]'.format(

        '{}: {}'.format(VoltageStates.unknown.name, VoltageStates.unknown.value),

        '{}: {}'.format(VoltageStates.ok.name, VoltageStates.ok.value),

        '{}: {}'.format(VoltageStates.low.name, VoltageStates.low.value),

        '{}: {}'.format(VoltageStates.critical.name, VoltageStates.critical.value)

    )

    print('Voltage states: ', state_info)


    await rvr.close()



if __name__ == '__main__':

    try:

        loop.run_until_complete(

            main()

        )


    except KeyboardInterrupt:

        print('\nProgram terminated with keyboard interrupt.')


        loop.run_until_complete(

            rvr.close()

        )


    finally:

        if loop.is_running():

            loop.close()


Power - Set Battery State Notifications

import os

import sys

sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), '../../../')))


import asyncio

from sphero_sdk import SpheroRvrAsync

from sphero_sdk import SerialAsyncDal



loop = asyncio.get_event_loop()


rvr = SpheroRvrAsync(

    dal=SerialAsyncDal(

        loop

    )

)



async def battery_voltage_state_change_handler(battery_voltage_state):

    print('Battery voltage state: ', battery_voltage_state)



async def main():

    """ This program demonstrates how to enable battery state change notifications.

    """


    await rvr.wake()


    # Give RVR time to wake up

    await asyncio.sleep(2)


    await rvr.on_battery_voltage_state_change_notify(handler=battery_voltage_state_change_handler)

    await rvr.enable_battery_voltage_state_change_notify(is_enabled=True)


    print('Waiting for battery notification...')


    # The asyncio loop will run forever to give the aforementioned events time to occur



if __name__ == '__main__':

    try:

        asyncio.ensure_future(

            main()

        )

        loop.run_forever()


    except KeyboardInterrupt:

        print('\nProgram terminated with keyboard interrupt.')


        loop.run_until_complete(

            rvr.close()

        )


    finally:

        if loop.is_running():

            loop.close()


Power - Get Battery State with REST API

import os

import sys

sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), '../../../')))


import asyncio

from sphero_sdk import SpheroRvrAsync

from sphero_sdk import RestfulAsyncDal

from sphero_sdk import BatteryVoltageStatesEnum as VoltageStates



loop = asyncio.get_event_loop()


rvr = SpheroRvrAsync(

    dal=RestfulAsyncDal(

        domain='10.211.2.21'# Add your raspberry-pi's IP address here

        port=2010  # The port opened by the npm server is always 2010

    )

)



async def main():

    """ This program demonstrates how to retrieve the battery state of RVR and print it to the console.

        Echo can be used to check to see if RVR is connected and awake.  In order to test it, a node.js

        server must be running on the raspberry-pi connected to RVR.  This code is meant to be executed

        from a separate computer.

    """


    await rvr.wake()


    # Give RVR time to wake up

    await asyncio.sleep(2)


    battery_percentage = await rvr.get_battery_percentage()

    print('Battery percentage: ', battery_percentage)


    battery_voltage_state = await rvr.get_battery_voltage_state()

    print('Voltage state: ', battery_voltage_state)


    state_info = '[{}, {}, {}, {}]'.format(

        '{}: {}'.format(VoltageStates.unknown.name, VoltageStates.unknown.value),

        '{}: {}'.format(VoltageStates.ok.name, VoltageStates.ok.value),

        '{}: {}'.format(VoltageStates.low.name, VoltageStates.low.value),

        '{}: {}'.format(VoltageStates.critical.name, VoltageStates.critical.value)

    )

    print('Voltage states: ', state_info)


    await rvr.close()



if __name__ == '__main__':

    try:

        loop.run_until_complete(

            main()

        )


    except KeyboardInterrupt:

        print('\nProgram terminated with keyboard interrupt.')


        loop.run_until_complete(

            rvr.close()

        )


    finally:

        if loop.is_running():

            loop.close()


Power - Sleep Monitoring

import os

import sys

sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), '../../../')))


import asyncio

from sphero_sdk import SpheroRvrAsync

from sphero_sdk import SerialAsyncDal



loop = asyncio.get_event_loop()


rvr = SpheroRvrAsync(

    dal=SerialAsyncDal(

        loop

    )

)



async def will_sleep_handler():

    print('RVR is about to sleep...')


    # here we could issue a command to RVR, e.g. wake() such that the sleep timer is reset



async def did_sleep_handler():

    print('RVR is asleep...')



async def main():

    """ This program demonstrates how to register handlers for a) the event received 10 seconds

        before RVR will sleep unless some new command is issued and b) the event received

        when RVR does go to sleep.


        Note that these notifications are received without the need to enable them on the robot.

    """


    await rvr.wake()


    # Give RVR time to wake up

    await asyncio.sleep(2)


    await rvr.on_will_sleep_notify(handler=will_sleep_handler)

    await rvr.on_did_sleep_notify(handler=did_sleep_handler)


    # The asyncio loop will run forever to give the aforementioned events time to occur



if __name__ == '__main__':

    try:

        asyncio.ensure_future(

            main()

        )

        loop.run_forever()


    except KeyboardInterrupt:

        print('\nProgram terminated with keyboard interrupt.')


        loop.run_until_complete(

            rvr.close()

        )


    finally:

        if loop.is_running():

            loop.close()


Sensor Streaming - Change Stream Settings

import os

import sys

sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), '../../../')))


import asyncio

from sphero_sdk import SpheroRvrAsync

from sphero_sdk import SerialAsyncDal

from sphero_sdk import RvrStreamingServices



loop = asyncio.get_event_loop()


rvr = SpheroRvrAsync(

    dal=SerialAsyncDal(

        loop

    )

)



async def imu_handler(imu_data):

    print('IMU data response: ', imu_data)



async def ambient_light_handler(ambient_light_data):

    print('Ambient data response:', ambient_light_data)



async def velocity_handler(velocity_data):

    print('Velocity data response:', velocity_data)



async def main():

    """ This program demonstrates how to update sensor streaming parameters at runawait asyncio.

    """


    await rvr.wake()


    # Give RVR time to wake up

    await asyncio.sleep(2)


    print('----------', 'Enabling IMU at 100 ms', '----------')


    await rvr.sensor_control.add_sensor_data_handler(

        service=RvrStreamingServices.imu,

        handler=imu_handler

    )

    await rvr.sensor_control.start(interval=100)


    # Delay to allow RVR to stream sensor data

    await asyncio.sleep(5)


    print('----------', 'Updating interval to 1000 ms', '----------')


    await rvr.sensor_control.stop()

    await rvr.sensor_control.start(interval=1000)


    # Delay to allow RVR to stream sensor data

    await asyncio.sleep(5)


    print('----------', 'Adding ambient light and velocity sensor streams', '----------')


    await rvr.sensor_control.stop()

    await rvr.sensor_control.add_sensor_data_handler(

        service=RvrStreamingServices.ambient_light,

        handler=ambient_light_handler

    )

    await rvr.sensor_control.add_sensor_data_handler(

        service=RvrStreamingServices.velocity,

        handler=velocity_handler

    )

    await rvr.sensor_control.start(interval=1000)


    # Delay to allow RVR to stream sensor data

    await asyncio.sleep(5)


    print('----------', 'Disabling IMU sensor stream and updating interval to 100 ms', '----------')


    await rvr.sensor_control.stop()

    await rvr.sensor_control.remove_sensor_data_handler(service=RvrStreamingServices.imu)

    await rvr.sensor_control.start(interval=100)


    # Delay to allow RVR to stream sensor data

    await asyncio.sleep(5)


    print('----------', 'Clearing all services', '----------')


    await rvr.sensor_control.clear()


    # Delay to allow RVR to stream sensor data

    await asyncio.sleep(1)


    await rvr.close()



if __name__ == '__main__':

    try:

        loop.run_until_complete(

            main()

        )


    except KeyboardInterrupt:

        print('\nProgram terminated with keyboard interrupt.')


        loop.run_until_complete(

            asyncio.gather(

                rvr.sensor_control.clear(),

                rvr.close()

            )

        )


    finally:

        if loop.is_running():

            loop.close()


Sensor Streaming - Change Stream Settings with REST API

import os

import sys

sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), '../../../')))


import asyncio

from sphero_sdk import SpheroRvrAsync

from sphero_sdk import RestfulAsyncDal

from sphero_sdk import RvrStreamingServices



loop = asyncio.get_event_loop()


rvr = SpheroRvrAsync(

    dal=RestfulAsyncDal(

        domain='10.211.2.21'# Add your raspberry-pi's IP address here

        port=2010

    )

)



async def imu_handler(imu_data):

    print('IMU data response: ', imu_data)



async def ambient_light_handler(ambient_light_data):

    print('Ambient data response:', ambient_light_data)



async def velocity_handler(velocity_data):

    print('Velocity data response:', velocity_data)



async def main():

    """ This program has RVR drive around in different directions using the function raw_motors.


        Note:

            To give RVR time to drive, we call asyncio.sleep(...); if we did not have these calls, the program would

            go on and execute all the statements and exit without the driving ever taking place.

    """


    await rvr.wake()


    # Give RVR time to wake up

    await asyncio.sleep(2)


    print('----------', 'Enabling IMU at 100 ms', '----------')


    await rvr.sensor_control.add_sensor_data_handler(

        service=RvrStreamingServices.imu,

        handler=imu_handler

    )

    await rvr.sensor_control.start(interval=100)


    # Delay to allow RVR to stream sensor data

    await asyncio.sleep(5)


    print('----------', 'Updating interval to 1000 ms', '----------')


    await rvr.sensor_control.stop()

    await rvr.sensor_control.start(interval=1000)


    # Delay to allow RVR to stream sensor data

    await asyncio.sleep(5)


    print('----------', 'Adding ambient light and velocity sensor streams', '----------')


    await rvr.sensor_control.stop()

    await rvr.sensor_control.add_sensor_data_handler(

        service=RvrStreamingServices.ambient_light,

        handler=ambient_light_handler

    )

    await rvr.sensor_control.add_sensor_data_handler(

        service=RvrStreamingServices.velocity,

        handler=velocity_handler

    )

    await rvr.sensor_control.start(interval=1000)


    # Delay to allow RVR to stream sensor data

    await asyncio.sleep(5)


    print('----------', 'Disabling IMU sensor stream and updating interval to 100 ms', '----------')


    await rvr.sensor_control.stop()

    await rvr.sensor_control.remove_sensor_data_handler(service=RvrStreamingServices.imu)

    await rvr.sensor_control.start(interval=100)


    # Delay to allow RVR to stream sensor data

    await asyncio.sleep(5)


    print('----------', 'Clearing all services', '----------')


    await rvr.sensor_control.clear()


    # Delay to allow RVR to stream sensor data

    await asyncio.sleep(1)


    await rvr.close()



if __name__ == '__main__':

    try:

        loop.run_until_complete(

            main()

        )


    except KeyboardInterrupt:

        print('\nProgram terminated with keyboard interrupt.')


        loop.run_until_complete(

            asyncio.gather(

                rvr.sensor_control.clear(),

                rvr.close()

            )

        )


    finally:

        if loop.is_running():

            loop.close()


Sensor Streaming - Multi Sensor Stream

import os

import sys

sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), '../../../')))


import asyncio

from sphero_sdk import SpheroRvrAsync

from sphero_sdk import SerialAsyncDal

from sphero_sdk import RvrStreamingServices



loop = asyncio.get_event_loop()


rvr = SpheroRvrAsync(

    dal=SerialAsyncDal(

        loop

    )

)



async def imu_handler(imu_data):

    print('IMU data response: ', imu_data)



async def color_detected_handler(color_detected_data):

    print('Color detection data response: ', color_detected_data)



async def accelerometer_handler(accelerometer_data):

    print('Accelerometer data response: ', accelerometer_data)



async def ambient_light_handler(ambient_light_data):

    print('Ambient light data response: ', ambient_light_data)



async def main():

    """ This program demonstrates how to enable multiple sensors to stream.

    """


    await rvr.wake()


    # Give RVR time to wake up

    await asyncio.sleep(2)


    await rvr.sensor_control.add_sensor_data_handler(

        service=RvrStreamingServices.imu,

        handler=imu_handler

    )

    await rvr.sensor_control.add_sensor_data_handler(

        service=RvrStreamingServices.color_detection,

        handler=color_detected_handler

    )

    await rvr.sensor_control.add_sensor_data_handler(

        service=RvrStreamingServices.accelerometer,

        handler=accelerometer_handler

    )

    await rvr.sensor_control.add_sensor_data_handler(

        service=RvrStreamingServices.ambient_light,

        handler=ambient_light_handler

    )


    await rvr.sensor_control.start(interval=250)


    # The asyncio loop will run forever to allow infinite streaming.



if __name__ == '__main__':

    try:

        asyncio.ensure_future(

            main()

        )

        loop.run_forever()


    except KeyboardInterrupt:

        print('\nProgram terminated with keyboard interrupt.')


        loop.run_until_complete(

            asyncio.gather(

                rvr.sensor_control.clear(),

                rvr.close()

            )

        )


    finally:

        if loop.is_running():

            loop.close()


Sensor Streaming - Single Sensor Stream

import os

import sys

sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), '../../../')))


import asyncio

from sphero_sdk import SpheroRvrAsync

from sphero_sdk import SerialAsyncDal

from sphero_sdk import RvrStreamingServices



loop = asyncio.get_event_loop()


rvr = SpheroRvrAsync(

    dal=SerialAsyncDal(

        loop

    )

)



async def accelerometer_handler(accelerometer_data):

    print('Accelerometer data response: ', accelerometer_data)



async def main():

    """ This program demonstrates how to enable a single sensor to stream.

    """


    await rvr.wake()


    # Give RVR time to wake up

    await asyncio.sleep(2)


    await rvr.sensor_control.add_sensor_data_handler(

        service=RvrStreamingServices.accelerometer,

        handler=accelerometer_handler

    )


    await rvr.sensor_control.start(interval=250)


    # The asyncio loop will run forever to allow infinite streaming.



if __name__ == '__main__':

    try:

        asyncio.ensure_future(

            main()

        )

        loop.run_forever()


    except KeyboardInterrupt:

        print('\nProgram terminated with keyboard interrupt.')


        loop.run_until_complete(

            asyncio.gather(

                rvr.sensor_control.clear(),

                rvr.close()

            )

        )


    finally:

        if loop.is_running():

            loop.close()

System - Get Main App Version

import os

import sys

sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), '../../../')))


import asyncio

from sphero_sdk import SpheroRvrAsync

from sphero_sdk import SerialAsyncDal

from sphero_sdk import SpheroRvrTargets



loop = asyncio.get_event_loop()


rvr = SpheroRvrAsync(

    dal=SerialAsyncDal(

        loop

    )

)



async def main():

    """ This program demonstrates how to obtain the firmware version for a specific processor.

    """


    await rvr.wake()


    # Give RVR time to wake up

    await asyncio.sleep(2)


    nordic_main_application_version = await rvr.get_main_application_version(target=SpheroRvrTargets.primary.value)

    print('Nordic main application version (target 1): ', nordic_main_application_version)


    st_main_application_version = await rvr.get_main_application_version(target=SpheroRvrTargets.secondary.value)

    print('ST main application version (target 2): ', st_main_application_version)


    await rvr.close()



if __name__ == '__main__':

    try:

        loop.run_until_complete(

            main()

        )


    except KeyboardInterrupt:

        print('\nProgram terminated with keyboard interrupt.')


        loop.run_until_complete(

            rvr.close()

        )


    finally:

        if loop.is_running():

            loop.close()


System - Get Main App Version with REST API

import os

import sys

sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), '../../../')))


import asyncio

from sphero_sdk import SpheroRvrAsync

from sphero_sdk import SpheroRvrTargets

from sphero_sdk import RestfulAsyncDal



loop = asyncio.get_event_loop()


rvr = SpheroRvrAsync(

    dal=RestfulAsyncDal(

        domain='10.211.2.21'# Add your raspberry-pi's IP address here

        port=2010  # The port opened by the npm server is always 2010

    )

)



async def main():

    """ This program demonstrates how to obtain the firmware version for a specific processor.

        In order to test it, a node.js server must be running on the raspberry-pi connected to RVR.

        This code is meant to be executed from a separate computer.

    """


    await rvr.wake()


    # Give RVR time to wake up

    await asyncio.sleep(2)


    nordic_main_application_version = await rvr.get_main_application_version(target=SpheroRvrTargets.primary.value)

    print('Nordic main application version (target 1): ', nordic_main_application_version)


    st_main_application_version = await rvr.get_main_application_version(target=SpheroRvrTargets.secondary.value)

    print('ST main application version (target 2): ', st_main_application_version)


    await rvr.close()



if __name__ == '__main__':

    try:

        loop.run_until_complete(

            main()

        )


    except KeyboardInterrupt:

        print('\nProgram terminated with keyboard interrupt.')


        loop.run_until_complete(

            rvr.close()

        )


    finally:

        if loop.is_running():

            loop.close()


Observer

API and Shell (Pinging)

import os

import sys

import time

sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), '../../../')))


from sphero_sdk import SpheroRvrObserver

from sphero_sdk import SpheroRvrTargets



rvr = SpheroRvrObserver()



def echo_handler(echo_response):

    print('Echo response: ', echo_response)



def main():

    """ This program demonstrates how to use the echo command, which sends data to RVR and RVR returns

        the same data. Echo can be used to check to see if RVR is connected and awake.

    """


    rvr.wake()


    # Give RVR time to wake up

    time.sleep(2)


    rvr.echo(

        data=[0, 1, 2],

        handler=echo_handler,

        target=SpheroRvrTargets.primary.value

    )


    # Give RVR time to respond

    time.sleep(1)


    rvr.echo(

        data=[3, 4, 5],

        handler=echo_handler,

        target=SpheroRvrTargets.secondary.value

    )


    # Sleep for one second such that RVR has time to send data back

    time.sleep(1)


    rvr.close()



if __name__ == '__main__':

    main()


Color Sensor (Color Detection)

import os

import sys

import time

sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), '../../../')))


from sphero_sdk import SpheroRvrObserver

from sphero_sdk import RvrStreamingServices



rvr = SpheroRvrObserver()



def color_detected_handler(color_detected_data):

    print('Color detection data response: ', color_detected_data)



def main():

    """ This program demonstrates how to use the color sensor on RVR (located on the down side of RVR, facing the floor)

        to report colors detected.

    """


    try:

        rvr.wake()


        # Give RVR time to wake up

        time.sleep(2)


        rvr.enable_color_detection(is_enabled=True)

        rvr.sensor_control.add_sensor_data_handler(

            service=RvrStreamingServices.color_detection,

            handler=color_detected_handler

        )

        rvr.sensor_control.start(interval=250)


        # Allow this program to run for 10 seconds

        time.sleep(10)


    except KeyboardInterrupt:

        print('\nProgram terminated with keyboard interrupt.')


    finally:

        rvr.sensor_control.clear()


        # Delay to allow RVR issue command before closing

        time.sleep(.5)

        

        rvr.close()



if __name__ == '__main__':

    main()


Drive - Raw Motors

import os

import sys

import time

sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), '../../../')))


from sphero_sdk import SpheroRvrObserver

from sphero_sdk import RawMotorModesEnum



rvr = SpheroRvrObserver()



def main():

    """ This program has RVR drive around in different directions.

    """


    try:

        rvr.wake()


        # Give RVR time to wake up

        time.sleep(2)


        rvr.reset_yaw()


        rvr.raw_motors(

            left_mode=RawMotorModesEnum.forward.value,

            left_speed=128# Valid speed values are 0-255

            right_mode=RawMotorModesEnum.forward.value,

            right_speed=128  # Valid speed values are 0-255

        )


        # Delay to allow RVR to drive

        time.sleep(1)


        rvr.raw_motors(

            left_mode=RawMotorModesEnum.reverse.value,

            left_speed=64# Valid speed values are 0-255

            right_mode=RawMotorModesEnum.reverse.value,

            right_speed=64  # Valid speed values are 0-255

        )


        # Delay to allow RVR to drive

        time.sleep(1)


        rvr.raw_motors(

            left_mode=RawMotorModesEnum.reverse.value,

            left_speed=128# Valid speed values are 0-255

            right_mode=RawMotorModesEnum.forward.value,

            right_speed=128  # Valid speed values are 0-255

        )


        # Delay to allow RVR to drive

        time.sleep(1)


        rvr.raw_motors(

            left_mode=RawMotorModesEnum.forward.value,

            left_speed=128# Valid speed values are 0-255

            right_mode=RawMotorModesEnum.forward.value,

            right_speed=128  # Valid speed values are 0-255

        )


        # Delay to allow RVR to drive

        time.sleep(1)


        rvr.raw_motors(

            left_mode=RawMotorModesEnum.off.value,

            left_speed=0# Valid speed values are 0-255

            right_mode=RawMotorModesEnum.off.value,

            right_speed=0  # Valid speed values are 0-255

        )


        # Delay to allow RVR to drive

        time.sleep(1)


    except KeyboardInterrupt:

        print('\nProgram terminated with keyboard interrupt.')


    finally:

        rvr.close()



if __name__ == '__main__':

    main()


Drive with Heading

import os

import sys

import time

sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), '../../../')))


from sphero_sdk import SpheroRvrObserver

from sphero_sdk import DriveFlagsBitmask



rvr = SpheroRvrObserver()



def main():

    """ This program has RVR drive around in different directions using the function drive_with_heading.

    """


    try:

        rvr.wake()


        # Give RVR time to wake up

        time.sleep(2)


        rvr.reset_yaw()


        rvr.drive_with_heading(

            speed=128# Valid speed values are 0-255

            heading=0# Valid heading values are 0-359

            flags=DriveFlagsBitmask.none.value

        )


        # Delay to allow RVR to drive

        time.sleep(1)


        rvr.drive_with_heading(

            speed=128# Valid speed values are 0-255

            heading=0# Valid heading values are 0-359

            flags=DriveFlagsBitmask.drive_reverse.value

        )


        # Delay to allow RVR to drive

        time.sleep(1)


        rvr.drive_with_heading(

            speed=128# Valid speed values are 0-255

            heading=90# Valid heading values are 0-359

            flags=DriveFlagsBitmask.none.value

        )


        # Delay to allow RVR to drive

        time.sleep(1)


        rvr.drive_with_heading(

            speed=128# Valid speed values are 0-255

            heading=270# Valid heading values are 0-359

            flags=DriveFlagsBitmask.none.value

        )


        # Delay to allow RVR to drive

        time.sleep(1)


        rvr.drive_with_heading(

            speed=0# Valid heading values are 0-359

            heading=0# Valid heading values are 0-359

            flags=DriveFlagsBitmask.none.value

        )


        # Delay to allow RVR to drive

        time.sleep(1)


    except KeyboardInterrupt:

        print('\nProgram terminated with keyboard interrupt.')


    finally:

        rvr.close()



if __name__ == '__main__':

    main()


Drive with Heading - Reverse Mode

import os

import sys

import time

sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), '../../../')))


from sphero_sdk import SpheroRvrObserver

from sphero_sdk import DriveFlagsBitmask



rvr = SpheroRvrObserver()



def main():

    """ This program has RVR drive around in different directions using the function drive_with_heading.

    """


    try:

        rvr.wake()


        # Give RVR time to wake up

        time.sleep(2)


        rvr.reset_yaw()


        rvr.drive_with_heading(

            speed=128# Valid speed values are 0-255

            heading=90# Valid heading values are 0-359

            flags=DriveFlagsBitmask.drive_reverse.value

        )


        # Delay to allow RVR to drive

        time.sleep(1)


        rvr.drive_with_heading(

            speed=0# Valid heading values are 0-359

            heading=0# Valid heading values are 0-359

            flags=DriveFlagsBitmask.none.value

        )


        # Delay to allow RVR to drive

        time.sleep(1)


    except KeyboardInterrupt:

        print('\nProgram terminated with keyboard interrupt.')


    finally:

        rvr.close()



if __name__ == '__main__':

    main()


Drive with Helper

import os

import sys

import time

sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), '../../../')))


from sphero_sdk import SpheroRvrObserver



rvr = SpheroRvrObserver()



def main():

    """ This program has RVR drive with how to drive RVR using the drive control helper.

    """


    try:

        rvr.wake()


        # Give RVR time to wake up

        time.sleep(2)


        rvr.drive_control.reset_heading()


        rvr.drive_control.drive_forward_seconds(

            speed=64,

            heading=0# Valid heading values are 0-359

            time_to_drive=1

        )


        # Delay to allow RVR to drive

        time.sleep(1)


        rvr.drive_control.drive_backward_seconds(

            speed=64,

            heading=0# Valid heading values are 0-359

            time_to_drive=1

        )


        # Delay to allow RVR to drive

        time.sleep(1)


        rvr.drive_control.turn_left_degrees(

            heading=0# Valid heading values are 0-359

            amount=90

        )


        # Delay to allow RVR to drive

        time.sleep(1)


    except KeyboardInterrupt:

        print('\nProgram terminated with keyboard interrupt.')


    finally:

        rvr.close()



if __name__ == '__main__':

    main()


Drive with Helper - Roll

import os

import sys

import time

sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), '../../../')))


from sphero_sdk import SpheroRvrObserver



rvr = SpheroRvrObserver()



def main():

    """ This program has RVR drive with how to drive RVR using the drive control helper.

    """


    try:

        rvr.wake()


        # Give RVR time to wake up

        time.sleep(2)


        rvr.drive_control.reset_heading()


        rvr.drive_control.roll_start(

            speed=64,

            heading=90

        )


        # Delay to allow RVR to drive

        time.sleep(1)


        rvr.drive_control.roll_stop(heading=270)


        # Delay to allow RVR to drive

        time.sleep(1)


    except KeyboardInterrupt:

        print('\nProgram terminated with keyboard interrupt.')


    finally:

        rvr.close()



if __name__ == '__main__':

    main()

Infrared - Broadcast IR

import os

import sys

import time

sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), '../../../')))


from sphero_sdk import SpheroRvrObserver

from sphero_sdk import InfraredCodes

from sphero_sdk import RawMotorModesEnum



rvr = SpheroRvrObserver()



def main():

    """ This program sets up RVR to communicate with another robot, e.g. BOLT, capable of infrared communication.

    """


    try:

        rvr.wake()


        # Give RVR time to wake up

        time.sleep(2)


        rvr.start_robot_to_robot_infrared_broadcasting(

            far_code=InfraredCodes.one.value,

            near_code=InfraredCodes.zero.value

        )


        for i in range(2):

            rvr.raw_motors(

                left_mode=RawMotorModesEnum.forward.value,

                left_speed=64# Valid speed values are 0-255

                right_mode=RawMotorModesEnum.forward.value,

                right_speed=64  # Valid speed values are 0-255

            )


            # Delay to allow RVR to drive

            time.sleep(2)


    except:

        print('\nProgram terminated with keyboard interrupt.')


    finally:

        rvr.stop_robot_to_robot_infrared_broadcasting()


        # Delay to allow RVR issue command before closing

        time.sleep(.5)


        rvr.close()



if __name__ == '__main__':

    main()


Infrared - Broadcast IR with Helper

import os

import sys

import time

sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), '../../../')))


from sphero_sdk import SpheroRvrObserver

from sphero_sdk import InfraredCodes

from sphero_sdk import RawMotorModesEnum



rvr = SpheroRvrObserver()



def main():

    """ This program sets up RVR to communicate with another robot, e.g. BOLT, capable of infrared communication.

    """


    try:

        rvr.wake()


        # Give RVR time to wake up

        time.sleep(2)


        rvr.infrared_control.start_robot_to_robot_infrared_broadcasting(

            far_code=InfraredCodes.one.value,

            near_code=InfraredCodes.zero.value

        )


        rvr.raw_motors(

            left_mode=RawMotorModesEnum.forward.value,

            left_speed=64# Valid speed values are 0-255

            right_mode=RawMotorModesEnum.forward.value,

            right_speed=64  # Valid speed values are 0-255

        )


        # Delay to allow RVR to drive

        time.sleep(4)


    except:

        print('\nProgram terminated with keyboard interrupt.')


    finally:

        rvr.stop_robot_to_robot_infrared_broadcasting()


        # Delay to allow RVR issue command before closing

        time.sleep(.5)


        rvr.close()



if __name__ == '__main__':

    main()


Infrared - Listen For and Send IR

import os

import sys

import time

sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), '../../../')))


from sphero_sdk import SpheroRvrObserver



rvr = SpheroRvrObserver()



def infrared_message_received_handler(infrared_message):

    print('Infrared message response: ', infrared_message)



def main():

    """ This program sets up RVR to communicate with another robot, e.g. BOLT, capable of infrared communication.

    """


    try:

        rvr.wake()


        # Give RVR time to wake up

        time.sleep(2)


        rvr.on_robot_to_robot_infrared_message_received_notify(handler=infrared_message_received_handler)


        rvr.enable_robot_infrared_message_notify(is_enabled=True)


        infrared_code = 3

        strength = 64


        for _ in range(20):

            rvr.send_infrared_message(

                infrared_code=infrared_code,

                front_strength=strength,

                left_strength=strength,

                right_strength=strength,

                rear_strength=strength

            )


            print('Infrared message sent with code: {0}'.format(infrared_code))


            time.sleep(2)


    except KeyboardInterrupt:

        print('\nProgram terminated with keyboard interrupt.')


    finally:

        rvr.stop_robot_to_robot_infrared_broadcasting()


        # Delay to allow RVR issue command before closing

        time.sleep(.5)

        

        rvr.close()



if __name__ == '__main__':

    main()


Infrared - Listen For and Send IR with Helper

import os

import sys

import time

sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), '../../../')))


from sphero_sdk import SpheroRvrObserver

from sphero_sdk import InfraredCodes



rvr = SpheroRvrObserver()



def infrared_message_received_handler(infrared_message):

    print('Infrared message response: ', infrared_message)



def main():

    """ This program sets up RVR to communicate with another robot, e.g. BOLT, capable of infrared communication.

    """


    try:

        rvr.wake()


        # Give RVR time to wake up

        time.sleep(2)


        rvr.infrared_control.listen_for_infrared_message(handler=infrared_message_received_handler)


        codes = [

            InfraredCodes.zero,

            InfraredCodes.one,

            InfraredCodes.two,

            InfraredCodes.three

        ]


        for _ in range(20):

            rvr.infrared_control.send_infrared_messages(

                messages=codes,

                strength=64

            )


            print('Infrared message sent with codes: {0}'.format([code.value for code in codes]))


            time.sleep(2)


    except KeyboardInterrupt:

        print('\nProgram terminated with keyboard interrupt.')


    finally:

        rvr.stop_robot_to_robot_infrared_broadcasting()


        # Delay to allow RVR issue command before closing

        time.sleep(.5)

        

        rvr.close()



if __name__ == '__main__':

    main()


LEDs - Set All LEDs

import os

import sys

import time

sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), '../../../')))


from sphero_sdk import SpheroRvrObserver

from sphero_sdk import Colors

from sphero_sdk import RvrLedGroups



rvr = SpheroRvrObserver()



def main():

    """ This program demonstrates how to set the all the LEDs.

    """


    try:

        rvr.wake()


        # Give RVR time to wake up

        time.sleep(2)


        rvr.set_all_leds(

            led_group=RvrLedGroups.all_lights.value,

            led_brightness_values=[color for _ in range(10) for color in Colors.off.value]

        )


        # Delay to show LEDs change

        time.sleep(1)


        rvr.set_all_leds(

            led_group=RvrLedGroups.all_lights.value,

            led_brightness_values=[color for _ in range(10) for color in [255, 0, 0]]

        )


        # Delay to show LEDs change

        time.sleep(1)


    except KeyboardInterrupt:

        print('\nProgram terminated with keyboard interrupt.')


    finally:

        rvr.close()



if __name__ == '__main__':

    main()


LEDs - Set All LEDs with Helper

import os

import sys

import time

sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), '../../../')))


from sphero_sdk import SpheroRvrObserver

from sphero_sdk import Colors



rvr = SpheroRvrObserver()



def main():

    """ This program demonstrates how to set the all the LEDs of RVR using the LED control helper.

    """


    try:

        rvr.wake()


        # Give RVR time to wake up

        time.sleep(2)


        rvr.led_control.turn_leds_off()


        # Delay to show LEDs change

        time.sleep(1)


        rvr.led_control.set_all_leds_color(color=Colors.yellow)


        # Delay to show LEDs change

        time.sleep(1)


        rvr.led_control.turn_leds_off()


        # Delay to show LEDs change

        time.sleep(1)


        rvr.led_control.set_all_leds_rgb(red=255, green=144, blue=0)


        # Delay to show LEDs change

        time.sleep(1)


    except KeyboardInterrupt:

        print('\nProgram terminated with keyboard interrupt.')


    finally:

        rvr.close()


if __name__ == '__main__':

    main()


LEDs - Set Multiple LEDs

import os

import sys

import time

sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), '../../../')))


from sphero_sdk import SpheroRvrObserver

from sphero_sdk import Colors

from sphero_sdk import RvrLedGroups



rvr = SpheroRvrObserver()



def main():

    """ This program demonstrates how to set multiple LEDs.

    """


    try:

        rvr.wake()


        # Give RVR time to wake up

        time.sleep(2)


        rvr.set_all_leds(

            led_group=RvrLedGroups.all_lights.value,

            led_brightness_values=[color for _ in range(10) for color in Colors.off.value]

        )


        # Delay to show LEDs change

        time.sleep(1)


        rvr.set_all_leds(

            led_group=RvrLedGroups.headlight_left.value | RvrLedGroups.headlight_right.value,

            led_brightness_values=[

                255, 0, 0,

                0, 0, 255

            ]

        )


        # Delay to show LEDs change

        time.sleep(1)


    except KeyboardInterrupt:

        print('\nProgram terminated with keyboard interrupt.')


    finally:

        rvr.close()



if __name__ == '__main__':

    main()


LEDs - Set Multiple LEDs with Helper

import os

import sys

import time

sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), '../../../')))


from sphero_sdk import SpheroRvrObserver

from sphero_sdk import Colors

from sphero_sdk import RvrLedGroups



rvr = SpheroRvrObserver()



def main():

    """ This program demonstrates how to set multiple LEDs on RVR using the LED control helper.

    """


    try:

        rvr.wake()


        # Give RVR time to wake up

        time.sleep(2)


        rvr.led_control.turn_leds_off()


        # Delay to show LEDs change

        time.sleep(1)


        rvr.led_control.set_multiple_leds_with_enums(

            leds=[

                RvrLedGroups.headlight_left,

                RvrLedGroups.headlight_right

            ],

            colors=[

                Colors.green,

                Colors.blue

            ]

        )


        # Delay to show LEDs change

        time.sleep(1)


        rvr.led_control.set_multiple_leds_with_rgb(

            leds=[

                RvrLedGroups.headlight_left,

                RvrLedGroups.headlight_right

            ],

            colors=[

                255, 0, 0,

                0, 255, 0

            ]

        )


        # Delay to show LEDs change

        time.sleep(1)


    except KeyboardInterrupt:

        print('\nProgram terminated with keyboard interrupt.')


    finally:

        rvr.close()



if __name__ == '__main__':

    main()


LEDs - Set Single LED

import os

import sys

import time

sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), '../../../')))


from sphero_sdk import SpheroRvrObserver

from sphero_sdk import Colors

from sphero_sdk import RvrLedGroups



rvr = SpheroRvrObserver()



def main():

    """ This program demonstrates how to set a single LED.

    """


    try:

        rvr.wake()


        # Give RVR time to wake up

        time.sleep(2)


        rvr.set_all_leds(

            led_group=RvrLedGroups.all_lights.value,

            led_brightness_values=[color for _ in range(10) for color in Colors.off.value]

        )


        # Delay to show LEDs change

        time.sleep(1)


        rvr.set_all_leds(

            led_group=RvrLedGroups.headlight_right.value,   # 0xe00

            led_brightness_values=[255, 0, 0]

        )


        # Delay to show LEDs change

        time.sleep(1)


        rvr.set_all_leds(

            led_group=RvrLedGroups.headlight_left.value,      # 0x1c0

            led_brightness_values=[0, 255, 0]

        )


        # Delay to show LEDs change

        time.sleep(1)


    except KeyboardInterrupt:

        print('\nProgram terminated with keyboard interrupt.')


    finally:

        rvr.close()



if __name__ == '__main__':

    main()


LEDs - Set Single LED with Helper

import os

import sys

import time

sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), '../../../')))


from sphero_sdk import SpheroRvrObserver

from sphero_sdk import Colors

from sphero_sdk import RvrLedGroups



rvr = SpheroRvrObserver()



def main():

    """ This program demonstrates how to set a single LED on RVR using the LED control helper.

    """


    try:

        rvr.wake()


        # Give RVR time to wake up

        time.sleep(2)


        rvr.led_control.turn_leds_off()


        # Delay to show LEDs change

        time.sleep(1)


        rvr.led_control.set_led_rgb(

            led=RvrLedGroups.headlight_right,

            red=255,

            green=0,

            blue=0

        )


        # Delay to show LEDs change

        time.sleep(1)


        rvr.led_control.set_led_color(

            led=RvrLedGroups.headlight_left,

            color=Colors.green

        )


        # Delay to show LEDs change

        time.sleep(1)


    except KeyboardInterrupt:

        print('\nProgram terminated with keyboard interrupt.')


    finally:

        rvr.close()



if __name__ == '__main__':

    main()


Power - Get Battery State

import os

import sys

import time

sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), '../../../')))


from sphero_sdk import SpheroRvrObserver

from sphero_sdk import BatteryVoltageStatesEnum as VoltageStates



rvr = SpheroRvrObserver()



def battery_percentage_handler(battery_percentage):

    print('Battery percentage: ', battery_percentage)



def battery_voltage_handler(battery_voltage_state):

    print('Voltage state: ', battery_voltage_state)


    state_info = '[{}, {}, {}, {}]'.format(

        '{}: {}'.format(VoltageStates.unknown.name, VoltageStates.unknown.value),

        '{}: {}'.format(VoltageStates.ok.name, VoltageStates.ok.value),

        '{}: {}'.format(VoltageStates.low.name, VoltageStates.low.value),

        '{}: {}'.format(VoltageStates.critical.name, VoltageStates.critical.value)

    )

    print('Voltage states: ', state_info)



def main():

    """ This program demonstrates how to retrieve the battery state of RVR.

    """


    try:

        rvr.wake()


        # Give RVR time to wake up

        time.sleep(2)


        rvr.get_battery_percentage(handler=battery_percentage_handler)


        # Sleep for one second such that RVR has time to send data back

        time.sleep(1)


        rvr.get_battery_voltage_state(handler=battery_voltage_handler)


        # Sleep for one second such that RVR has time to send data back

        time.sleep(1)


    except KeyboardInterrupt:

        print('\nProgram terminated with keyboard interrupt.')


    finally:

        rvr.close()



if __name__ == '__main__':

    main()


Power - Get Battery State Notifications

import os

import sys

import time

sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), '../../../')))


from sphero_sdk import SpheroRvrObserver



rvr = SpheroRvrObserver()



def battery_voltage_state_change_handler(battery_voltage_state):

    print('Battery voltage state: ', battery_voltage_state)



def main():

    """ This program demonstrates how to enable battery state change notifications.

    """


    try:

        rvr.wake()


        # Give RVR time to wake up

        time.sleep(2)


        rvr.on_battery_voltage_state_change_notify(handler=battery_voltage_state_change_handler)

        rvr.enable_battery_voltage_state_change_notify(is_enabled=True)


    except KeyboardInterrupt:

        print('\nProgram terminated with keyboard interrupt.')


    finally:

        rvr.close()



if __name__ == '__main__':

    main()


Power - Sleep Monitoring

import os

import sys

import time

sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), '../../../')))


from sphero_sdk import SpheroRvrObserver



rvr = SpheroRvrObserver()



def will_sleep_handler():

    print('RVR is about to sleep...')


    # here we could issue a command to RVR, e.g. wake() such that the sleep timer is reset



def did_sleep_handler():

    print('RVR is asleep...')



def main():

    """ This program demonstrates how to register handlers for a) the event received 10 seconds

        before RVR will sleep unless some new command is issued and b) the event received

        when RVR does go to sleep.


        Note that these notifications are received without the need to enable them on the robot.

    """


    try:

        rvr.wake()


        # Give RVR time to wake up

        time.sleep(2)


        rvr.on_will_sleep_notify(will_sleep_handler)


        rvr.on_did_sleep_notify(did_sleep_handler)


        # Sleep for 310 seconds such that we see the aforementioned events have time to occur

        time.sleep(310)


    except KeyboardInterrupt:

        print('\nProgram terminated with keyboard interrupt.')


    finally:

        rvr.close()



if __name__ == '__main__':

    main()


Sensor Streaming - Change Stream Settings

import os

import sys

import time

sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), '../../../')))


from sphero_sdk import SpheroRvrObserver

from sphero_sdk import RvrStreamingServices



rvr = SpheroRvrObserver()



def imu_handler(imu_data):

    print('IMU data response: ', imu_data)



def ambient_light_handler(ambient_light_data):

    print('Ambient data response:', ambient_light_data)



def velocity_handler(velocity_data):

    print('Velocity data response:', velocity_data)



def main():

    """ This program demonstrates how to update sensor streaming parameters at runtime.

    """


    try:

        rvr.wake()


        # Give RVR time to wake up

        time.sleep(2)


        print('----------', 'Enabling IMU at 100 ms', '----------')


        rvr.sensor_control.add_sensor_data_handler(

            service=RvrStreamingServices.imu,

            handler=imu_handler

        )

        rvr.sensor_control.start(interval=100)


        # Delay to allow RVR to stream sensor data

        time.sleep(5)


        print('----------', 'Updating interval to 1000 ms', '----------')


        rvr.sensor_control.stop()

        rvr.sensor_control.start(interval=1000)


        # Delay to allow RVR to stream sensor data

        time.sleep(5)


        print('----------', 'Adding ambient light and velocity sensor streams', '----------')


        rvr.sensor_control.stop()

        rvr.sensor_control.add_sensor_data_handler(

            service=RvrStreamingServices.ambient_light,

            handler=ambient_light_handler

        )

        rvr.sensor_control.add_sensor_data_handler(

            service=RvrStreamingServices.velocity,

            handler=velocity_handler

        )

        rvr.sensor_control.start(interval=1000)


        # Delay to allow RVR to stream sensor data

        time.sleep(5)


        print('----------', 'Disabling IMU sensor stream and updating interval to 100 ms', '----------')


        rvr.sensor_control.stop()

        rvr.sensor_control.remove_sensor_data_handler(service=RvrStreamingServices.imu)

        rvr.sensor_control.start(interval=100)


        # Delay to allow RVR to stream sensor data

        time.sleep(5)


        print('----------', 'Clearing all services', '----------')


        rvr.sensor_control.clear()


        # Delay to allow RVR to stream sensor data

        time.sleep(1)


    except KeyboardInterrupt:

        print('\nProgram terminated with keyboard interrupt.')


    finally:

        if len(rvr.sensor_control.enabled_sensors) > 0:

            rvr.sensor_control.clear()


        # Delay to allow RVR issue command before closing

        time.sleep(.5)


        rvr.close()



if __name__ == '__main__':

    main()


Sensor Streaming - Multi Sensor Stream

import os

import sys

import time

sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), '../../../')))


from sphero_sdk import SpheroRvrObserver

from sphero_sdk import RvrStreamingServices



rvr = SpheroRvrObserver()



def imu_handler(imu_data):

    print('IMU data response: ', imu_data)



def color_detected_handler(color_detected_data):

    print('Color detection data response: ', color_detected_data)



def accelerometer_handler(accelerometer_data):

    print('Accelerometer data response: ', accelerometer_data)



def ambient_light_handler(ambient_light_data):

    print('Ambient light data response: ', ambient_light_data)



def main():

    """ This program demonstrates how to enable multiple sensors to stream.

    """


    try:

        rvr.wake()


        # Give RVR time to wake up

        time.sleep(2)


        rvr.sensor_control.add_sensor_data_handler(

            service=RvrStreamingServices.imu,

            handler=imu_handler

        )

        rvr.sensor_control.add_sensor_data_handler(

            service=RvrStreamingServices.color_detection,

            handler=color_detected_handler

        )

        rvr.sensor_control.add_sensor_data_handler(

            service=RvrStreamingServices.accelerometer,

            handler=accelerometer_handler

        )

        rvr.sensor_control.add_sensor_data_handler(

            service=RvrStreamingServices.ambient_light,

            handler=ambient_light_handler

        )


        rvr.sensor_control.start(interval=250)


        while True:

            # Delay to allow RVR to stream sensor data

            time.sleep(1)


    except KeyboardInterrupt:

        print('\nProgram terminated with keyboard interrupt.')


    finally:

        rvr.sensor_control.clear()


        # Delay to allow RVR issue command before closing

        time.sleep(.5)

        

        rvr.close()



if __name__ == '__main__':

    main()


Sensor Streaming - Single Sensor Stream

import os

import sys

import time

sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), '../../../')))


from sphero_sdk import SpheroRvrObserver

from sphero_sdk import RvrStreamingServices



rvr = SpheroRvrObserver()



def accelerometer_handler(accelerometer_data):

    print('Accelerometer data response: ', accelerometer_data)



def main():

    """ This program demonstrates how to enable a single sensor to stream.

    """


    try:

        rvr.wake()


        # Give RVR time to wake up

        time.sleep(2)


        rvr.sensor_control.add_sensor_data_handler(

            service=RvrStreamingServices.accelerometer,

            handler=accelerometer_handler

        )


        rvr.sensor_control.start(interval=250)


        while True:

            # Delay to allow RVR to stream sensor data

            time.sleep(1)


    except KeyboardInterrupt:

        print('\nProgram terminated with keyboard interrupt.')


    finally:

        rvr.sensor_control.clear()


        # Delay to allow RVR issue command before closing

        time.sleep(.5)

        

        rvr.close()



if __name__ == '__main__':

    main()


System (Get Main App Version)

import os

import sys

import time

sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), '../../../')))


from sphero_sdk import SpheroRvrObserver

from sphero_sdk import SpheroRvrTargets



rvr = SpheroRvrObserver()



def get_nordic_main_application_version_handler(nordic_main_application_version):

    print('Nordic main application version (target 1): ', nordic_main_application_version)



def get_st_main_application_version_handler(st_main_application_version):

    print('ST main application version (target 2): ', st_main_application_version)



def main():

    """ This program demonstrates how to obtain the firmware version for a specific processor.

    """


    try:

        rvr.wake()


        # Give RVR time to wake up

        time.sleep(2)


        rvr.get_main_application_version(

            handler=get_nordic_main_application_version_handler,

            target=SpheroRvrTargets.primary.value

        )


        # Sleep for one second such that RVR has time to send data back

        time.sleep(1)


        rvr.get_main_application_version(

            handler=get_st_main_application_version_handler,

            target=SpheroRvrTargets.secondary.value

        )


        # Sleep for one second such that RVR has time to send data back

        time.sleep(1)


    except KeyboardInterrupt:

        print('\nProgram terminated with keyboard interrupt.')


    finally:

        rvr.close()



if __name__ == '__main__':

    main()

Deprecated Options

Node.js SDK for Raspberry Pi 

Where to get the code:

Head over to the GitHub repo to grab the newest Node.js SDK!

Client.js SDK for Raspberry Pi 

Where to get the code: 

This GitHub repo contains the Client.js SDK!