Examples

Code examples can be found in the examples directory on GitHub

Also take a look at the unit tests in the tests directory.

Get all calling users

Source: calling_users.py

 1#!/usr/bin/env python
 2"""
 3Example script
 4Get all calling users within the org
 5"""
 6
 7from dotenv import load_dotenv
 8
 9from wxc_sdk import WebexSimpleApi
10
11load_dotenv(override=True)
12
13api = WebexSimpleApi()
14
15# using wxc_sdk.people.PeopleApi.list to iterate over persons
16# Parameter calling_data needs to be set to true to gat calling specific information
17# calling users have the attribute location_id set
18calling_users = [user for user in api.people.list(calling_data=True)
19                 if user.location_id]
20print(f'{len(calling_users)} users:')
21print('\n'.join(user.display_name for user in calling_users))

Get all calling users (async variant)

Source: calling_users_async.py

 1#!/usr/bin/env python
 2"""
 3Example script
 4Get all calling users within the org using the (experimental) async API
 5"""
 6import asyncio
 7import time
 8
 9from dotenv import load_dotenv
10
11from wxc_sdk.as_api import AsWebexSimpleApi
12
13load_dotenv(override=True)
14
15
16async def get_calling_users():
17    """
18    Get details of all calling enabled users by:
19    1) getting all calling users
20    2) collecting all users that have a calling license
21    3) getting details for all users
22    """
23    async with AsWebexSimpleApi(concurrent_requests=40) as api:
24        print('Collecting calling licenses')
25        calling_license_ids = set(lic.license_id for lic in await api.licenses.list()
26                                  if lic.webex_calling)
27
28        # get users with a calling license
29        calling_users = [user async for user in api.people.list_gen()
30                         if any(lic_id in calling_license_ids for lic_id in user.licenses)]
31        print(f'{len(calling_users)} users:')
32        print('\n'.join(user.display_name for user in calling_users))
33
34        # get details for all users
35        start = time.perf_counter()
36        details = await asyncio.gather(*[api.people.details(person_id=user.person_id, calling_data=True)
37                                         for user in calling_users])
38        expired = time.perf_counter() - start
39        print(f'Got details for {len(details)} users in {expired * 1000:.3f} ms')
40
41
42if __name__ == '__main__':
43    asyncio.run(get_calling_users())

Get all users without phones

Source: users_wo_devices.py

  1#!/usr/bin/env python
  2"""
  3Get calling users without devices
  4"""
  5import asyncio
  6import logging
  7import os
  8from itertools import chain
  9from typing import Optional
 10
 11from dotenv import load_dotenv
 12
 13from wxc_sdk import Tokens
 14from wxc_sdk.as_api import AsWebexSimpleApi
 15from wxc_sdk.common import UserType
 16from wxc_sdk.integration import Integration
 17from wxc_sdk.person_settings import DeviceList
 18from wxc_sdk.scopes import parse_scopes
 19
 20
 21def env_path() -> str:
 22    """
 23    determine path for .env to load environment variables from; based on name of this file
 24    :return: .env file path
 25    """
 26    return os.path.join(os.getcwd(), f'{os.path.splitext(os.path.basename(__file__))[0]}.env')
 27
 28
 29def yml_path() -> str:
 30    """
 31    determine path of YML file to persist tokens
 32    :return: path to YML file
 33    :rtype: str
 34    """
 35    return os.path.join(os.getcwd(), f'{os.path.splitext(os.path.basename(__file__))[0]}.yml')
 36
 37
 38def build_integration() -> Integration:
 39    """
 40    read integration parameters from environment variables and create an integration
 41    :return: :class:`wxc_sdk.integration.Integration` instance
 42    """
 43    client_id = os.getenv('INTEGRATION_CLIENT_ID')
 44    client_secret = os.getenv('INTEGRATION_CLIENT_SECRET')
 45    scopes = parse_scopes(os.getenv('INTEGRATION_SCOPES'))
 46    redirect_url = 'http://localhost:6001/redirect'
 47    if not all((client_id, client_secret, scopes)):
 48        raise ValueError('failed to get integration parameters from environment')
 49    return Integration(client_id=client_id, client_secret=client_secret, scopes=scopes,
 50                       redirect_url=redirect_url)
 51
 52
 53def get_tokens() -> Optional[Tokens]:
 54    """
 55    Tokens are read from a YML file. If needed an OAuth flow is initiated.
 56
 57    :return: tokens
 58    :rtype: :class:`wxc_sdk.tokens.Tokens`
 59    """
 60
 61    integration = build_integration()
 62    tokens = integration.get_cached_tokens_from_yml(yml_path=yml_path())
 63    return tokens
 64
 65
 66async def main():
 67    # get environment variables from .env; required for integration parameters
 68    load_dotenv(env_path())
 69
 70    # get tokens from cache or create a new set of tokens using the integration defined in .env
 71    tokens = get_tokens()
 72
 73    async with AsWebexSimpleApi(tokens=tokens) as api:
 74        # get calling users
 75        calling_users = [user for user in await api.people.list(calling_data=True)
 76                         if user.location_id]
 77
 78        # get device info for all users
 79        user_device_infos = await asyncio.gather(*[api.person_settings.devices(person_id=user.person_id)
 80                                                   for user in calling_users])
 81        user_device_infos: list[DeviceList]
 82        users_wo_devices = [user for user, device_info in zip(calling_users, user_device_infos)
 83                            if not device_info.devices]
 84
 85        # alternatively we can collect all device owners
 86        device_owner_ids = set(owner.owner_id
 87                               for device in chain.from_iterable(udi.devices for udi in user_device_infos)
 88                               if (owner := device.owner) and owner.owner_type == UserType.people)
 89
 90        # ... and collect all other users (the ones not owning a device)
 91        users_not_owning_a_device = [user for user in calling_users
 92                                     if user.person_id not in device_owner_ids]
 93
 94    users_wo_devices.sort(key=lambda u: u.display_name)
 95    print(f'{len(users_wo_devices)} users w/o devices:')
 96    print('\n'.join(f'{user.display_name} ({user.emails[0]})'
 97                    for user in users_wo_devices))
 98
 99    print()
100    users_not_owning_a_device.sort(key=lambda u: u.display_name)
101    print(f'{len(users_not_owning_a_device)} users not owning a device:')
102    print('\n'.join(f'{user.display_name} ({user.emails[0]})'
103                    for user in users_not_owning_a_device))
104
105
106if __name__ == '__main__':
107    # enable DEBUG logging to a file; REST log shows all requests
108    logging.basicConfig(filename=os.path.join(os.getcwd(), f'{os.path.splitext(os.path.basename(__file__))[0]}.log'),
109                        filemode='w', level=logging.DEBUG, format='%(asctime)s %(threadName)s %(message)s')
110    asyncio.run(main())

Default call forwarding settings for all users

This example start with the list of all calling users and then calls wxc_sdk.person_settings.forwarding.PersonForwardingApi.configure() for each user. To speed up things a ThreadPoolExecutor is used and all update operations are scheduled for execution by the thread pool.

Source: reset_call_forwarding.py

 1#!/usr/bin/env python
 2"""
 3Example script
 4Reset call forwarding to default for all users in the org
 5"""
 6import asyncio
 7import logging
 8import time
 9
10from dotenv import load_dotenv
11
12from wxc_sdk.all_types import PersonForwardingSetting
13from wxc_sdk.as_api import AsWebexSimpleApi
14
15
16async def main():
17    # load environment. SDk fetches the access token from WEBEX_ACCESS_TOKEN environment variable
18    load_dotenv(override=True)
19
20    logging.basicConfig(level=logging.INFO)
21
22    # set to DEBUG to see the actual requests
23    logging.getLogger('wxc_sdk.rest').setLevel(logging.INFO)
24
25    async with AsWebexSimpleApi() as api:
26
27        # get all calling users
28        start = time.perf_counter_ns()
29        calling_users = [user for user in await api.people.list(calling_data=True)
30                         if user.location_id]
31        print(f'Got {len(calling_users)} calling users in '
32              f'{(time.perf_counter_ns() - start) / 1e6:.3f} ms')
33
34        # set call forwarding to default for all users
35        # default call forwarding settings
36        forwarding = PersonForwardingSetting.default()
37
38        # schedule update for each user and wait for completion
39        start = time.perf_counter_ns()
40        await asyncio.gather(*[api.person_settings.forwarding.configure(entity_id=user.person_id,
41                                                                        forwarding=forwarding)
42                               for user in calling_users])
43        print(f'Reset call forwarding to default for {len(calling_users)} users in '
44              f'{(time.perf_counter_ns() - start) / 1e6:.3f} ms')
45
46if __name__ == '__main__':
47    asyncio.run(main())

Modify number of rings configuration for users read from CSV

Here we read a bunch of user email addresses from a CSV. The CSV as an ERROR column and we only want to consider users without errors.

For all these users a VM setting is updated and the results are written to a CSV for further processing.

Source: modify_voicemail.py

 1#!/usr/bin/env python
 2"""
 3Example Script
 4Modifying number of rings configuration in voicemail settings
 5Run -> python3 modify_voicemail.py modify_voicemail.csv
 6"""
 7
 8import csv
 9import sys
10import traceback
11from concurrent.futures import ThreadPoolExecutor
12
13from dotenv import load_dotenv
14
15from wxc_sdk import WebexSimpleApi
16from wxc_sdk.all_types import *
17
18VOICEMAIL_SETTINGS_NUMBER_OF_RINGS = 6
19
20# loading environment variables - use .env file for development
21load_dotenv(override=True)
22
23
24def update_vm_settings():
25    """
26    actually update VM settings for all users present in input CSV
27    """
28    api = WebexSimpleApi()
29    final_report = []
30    mail_ids = []
31    # using wxc_sdk.people.PeopleApi.list to iterate over persons
32    # Parameter calling_data needs to be set to true to gat calling specific information
33    # calling users have the attribute location_id set
34    calling_users = [user for user in api.people.list(calling_data=True) if user.location_id]
35    print(f'{len(calling_users)} users:')
36    print('\n'.join(user.display_name for user in calling_users))
37
38    # get CSV file name from command line
39    with open(str(sys.argv[1])) as csv_file:
40        reader = csv.DictReader(csv_file)
41        # read all records from CSV. Ony consider records w/o error
42        for col in reader:
43            if not col['ERROR']:
44                # collect email address from USERNAME column for further processing
45                mail_ids.append(col['USERNAME'])
46            else:
47                final_report.append((col['USERNAME'], 'FAILED DUE TO ERROR REASON IN INPUT FILE'))
48
49    # work on calling users that have an email address that we read from the CSV
50    filteredUsers = [d for d in calling_users if d.emails[0] in mail_ids]
51
52    print('\nCalling Users in CI  - Count ', len(calling_users))
53    print('Mail IDs from input file after removing error columns - Count ', len(mail_ids))
54    print('FilteredUsers Users -  Count', len(filteredUsers))
55
56    def set_number_of_rings(user: Person):
57        """
58        Read VM config for a user
59        :param user: user to update
60        """
61        try:
62            # shortcut
63            vm = api.person_settings.voicemail
64
65            # Read Current configuration
66            vm_settings = vm.read(person_id=user.person_id)
67            print(f'\n Existing Configuration: {vm_settings} ')
68
69            # Modify number of rings value
70            vm_settings.send_unanswered_calls.number_of_rings = VOICEMAIL_SETTINGS_NUMBER_OF_RINGS
71            vm.configure(user.person_id, settings=vm_settings)
72            # Read configuration after changes
73            vm_settings = vm.read(user.person_id)
74            print(f'\n New Configuration: {vm_settings} ')
75            final_report.append((user.display_name, 'SUCCESS'))
76        except Exception as e:
77            final_report.append((user.display_name, 'FAILURE'))
78            print('type error: ' + str(e))
79            print(traceback.format_exc())
80        return
81
82    # Modify settings for the filtered users
83    with ThreadPoolExecutor() as pool:
84        list(pool.map(lambda user: set_number_of_rings(user), filteredUsers))
85
86    print(final_report)
87    with open('output.csv', 'w') as f:
88        write = csv.writer(f)
89        write.writerow(['USERNAME', 'STATUS'])
90        write.writerows(final_report)
91
92
93if __name__ == '__main__':
94    update_vm_settings()

Holiday schedule w/ US national holidays for all US locations

This example uses the Calendarific API at https://calendarific.com/ to get a list of national US holidays and creates a “National Holidays” holiday schedule for all US locations with all these national holidays.

A rudimentary API implementation in calendarific.py is used for the requests to https://calendarific.com/. Calendarific APIs require all requests to be authenticated using an API key. You can sign up for a free account to get a free API account key which then is read from environment variable CALENDARIFIC_KEY.

Source: us_holidays.py

  1#!/usr/bin/env python
  2"""
  3Example script
  4Create a holiday schedule for all US locations with all national holidays
  5"""
  6
  7import logging
  8from collections import defaultdict
  9from concurrent.futures import ThreadPoolExecutor
 10from datetime import date
 11from threading import Lock
 12from typing import List
 13
 14from calendarific import CalendarifiyApi, Holiday
 15from dotenv import load_dotenv
 16
 17from wxc_sdk import WebexSimpleApi
 18from wxc_sdk.all_types import Event, Schedule, ScheduleType
 19from wxc_sdk.locations import Location
 20
 21log = logging.getLogger(__name__)
 22
 23# a lock per location to protect observe_in_location()
 24location_locks: dict[str, Lock] = defaultdict(Lock)
 25
 26# Use parallel threads for provisioning?
 27USE_THREADING = True
 28
 29# True: delete holiday schedule instead of creating one
 30CLEAN_UP = False
 31
 32# first and last year for which to create public holiday events
 33FIRST_YEAR = 2022
 34LAST_YEAR = 2024
 35
 36LAST_YEAR = not CLEAN_UP and LAST_YEAR or FIRST_YEAR
 37
 38
 39def observe_in_location(*, api: WebexSimpleApi, location: Location, holidays: List[Holiday]):
 40    """
 41    create/update a "National Holiday" schedule in one location
 42
 43    :param api: Webex api
 44    :type api: WebexSimpleApi
 45    :param location: location to work on
 46    :type location: Location
 47    :param holidays: list of holidays to observe
 48    :type holidays: List[Holiday]
 49    """
 50    # there should always only one thread messing with the holiday schedule of a location
 51    with location_locks[location.location_id]:
 52        year = holidays[0].date.year
 53        schedule_name = 'National Holidays'
 54
 55        # shortcut
 56        ats = api.telephony.schedules
 57
 58        # existing "National Holiday" schedule or None
 59        schedule = next((schedule
 60                         for schedule in ats.list(obj_id=location.location_id,
 61                                                  schedule_type=ScheduleType.holidays,
 62                                                  name=schedule_name)
 63                         if schedule.name == schedule_name),
 64                        None)
 65        if CLEAN_UP:
 66            if schedule:
 67                log.info(f'Delete schedule {schedule.name} in location {schedule.location_name}')
 68                ats.delete_schedule(obj_id=location.location_id,
 69                                    schedule_type=ScheduleType.holidays,
 70                                    schedule_id=schedule.schedule_id)
 71            return
 72        if schedule:
 73            # we need the details: list response doesn't have events
 74            schedule = ats.details(obj_id=location.location_id,
 75                                   schedule_type=ScheduleType.holidays,
 76                                   schedule_id=schedule.schedule_id)
 77        # create list of desired schedule entries
 78        #   * one per holiday
 79        #   * only future holidays
 80        #   * not on a Sunday
 81        today = date.today()
 82        events = [Event(name=f'{holiday.name} {holiday.date.year}',
 83                        start_date=holiday.date,
 84                        end_date=holiday.date,
 85                        all_day_enabled=True)
 86                  for holiday in holidays
 87                  if holiday.date >= today and holiday.date.weekday() != 6]
 88
 89        if not schedule:
 90            # create new schedule
 91            log.debug(f'observe_in_location({location.name}, {year}): no existing schedule')
 92            if not events:
 93                log.info(f'observe_in_location({location.name}, {year}): no existing schedule, no events, done')
 94                return
 95            schedule = Schedule(name=schedule_name,
 96                                schedule_type=ScheduleType.holidays,
 97                                events=events)
 98            log.debug(
 99                f'observe_in_location({location.name}, {year}): creating schedule "{schedule_name}" with {len(events)} '
100                f'events')
101            schedule_id = ats.create(obj_id=location.location_id, schedule=schedule)
102            log.info(f'observe_in_location({location.name}, {year}): new schedule id: {schedule_id}, done')
103            return
104
105        # update existing schedule
106        with ThreadPoolExecutor() as pool:
107            # delete existing events in the past
108            to_delete = [event
109                         for event in schedule.events
110                         if event.start_date < today]
111            if to_delete:
112                log.debug(f'observe_in_location({location.name}, {year}): deleting {len(to_delete)} outdated events')
113                if USE_THREADING:
114                    list(pool.map(
115                        lambda event: ats.event_delete(obj_id=location.location_id,
116                                                       schedule_type=ScheduleType.holidays,
117                                                       schedule_id=schedule.schedule_id,
118                                                       event_id=event.event_id),
119                        to_delete))
120                else:
121                    for event in to_delete:
122                        ats.event_delete(obj_id=location.location_id,
123                                         schedule_type=ScheduleType.holidays,
124                                         schedule_id=schedule.schedule_id,
125                                         event_id=event.event_id)
126
127            # add events which don't exist yet
128            existing_dates = set(event.start_date
129                                 for event in schedule.events)
130            to_add = [event
131                      for event in events
132                      if event.start_date not in existing_dates]
133            if not to_add:
134                log.info(f'observe_in_location({location.name}, {year}): no events to add, done.')
135                return
136            log.debug(f'observe_in_location({location.name}, {year}): creating {len(to_add)} new events.')
137            if USE_THREADING:
138                list(pool.map(
139                    lambda event: ats.event_create(
140                        obj_id=location.location_id,
141                        schedule_type=ScheduleType.holidays,
142                        schedule_id=schedule.schedule_id,
143                        event=event),
144                    to_add))
145            else:
146                for event in to_add:
147                    ats.event_create(
148                        obj_id=location.location_id,
149                        schedule_type=ScheduleType.holidays,
150                        schedule_id=schedule.schedule_id,
151                        event=event)
152        log.info(f'observe_in_location({location.name}, {year}): done.')
153    return
154
155
156def observe_national_holidays(*, api: WebexSimpleApi, locations: List[Location],
157                              year: int = None):
158    """
159    US national holidays for given locations
160
161    :param api: Webex api
162    :type api: WebexSimpleApi
163    :param locations: list of locations in which US national holidays should be observed
164    :type locations: List[Location]
165    :param year: year for national holidays. Default: current year
166    :type year: int
167    """
168    # default: this year
169    year = year or date.today().year
170
171    # get national holidays for specified year
172    holidays = CalendarifiyApi().holidays(country='US', year=year, holiday_type='national')
173
174    # update holiday schedule for each location
175    with ThreadPoolExecutor() as pool:
176        if USE_THREADING:
177            list(pool.map(
178                lambda location: observe_in_location(api=api, location=location, holidays=holidays),
179                locations))
180        else:
181            for location in locations:
182                observe_in_location(api=api, location=location, holidays=holidays)
183    return
184
185
186if __name__ == '__main__':
187    # read dotenv which has some environment variables like Webex API token and Calendarify
188    # API key.
189    load_dotenv(override=True)
190
191    # enable logging
192    logging.basicConfig(level=logging.DEBUG,
193                        format='%(asctime)s %(levelname)s %(threadName)s %(name)s: %(message)s')
194    logging.getLogger('urllib3').setLevel(logging.INFO)
195    logging.getLogger('wxc_sdk.rest').setLevel(logging.INFO)
196
197    # the actual action
198    with WebexSimpleApi(concurrent_requests=5) as wx_api:
199        # get all US locations
200        log.info('Getting locations...')
201        us_locations = [location
202                        for location in wx_api.locations.list()
203                        if location.address.country == 'US']
204
205        # set up location locks
206        # location_locks is a defaultdict -> accessing with all potential keys creates the locks
207        list(location_locks[loc.location_id] for loc in us_locations)
208
209        # create national holiday schedule for given year(s) and locations
210        if USE_THREADING:
211            with ThreadPoolExecutor() as pool:
212                list(pool.map(
213                    lambda year: observe_national_holidays(api=wx_api, year=year, locations=us_locations),
214                    range(FIRST_YEAR, LAST_YEAR + 1)))
215        else:
216            for year in range(FIRST_YEAR, LAST_YEAR + 1):
217                observe_national_holidays(api=wx_api, year=year, locations=us_locations)

Holiday schedule w/ US national holidays for all US locations (async variant)

This example uses the Calendarific API at https://calendarific.com/ to get a list of national US holidays and creates a “National Holidays” holiday schedule for all US locations with all these national holidays.

A rudimentary API implementation in calendarific.py is used for the requests to https://calendarific.com/. Calendarific APIs require all requests to be authenticated using an API key. You can sign up for a free account to get a free API account key which then is read from environment variable CALENDARIFIC_KEY.

Source: us_holidays_async.py

  1#!/usr/bin/env python
  2"""
  3Example script
  4Create a holiday schedule for all US locations with all national holidays
  5
  6Using the asyc SDK variant
  7"""
  8import asyncio
  9import functools
 10import logging
 11from collections import defaultdict
 12from datetime import date
 13from typing import List
 14
 15from calendarific import CalendarifiyApi, Holiday
 16from dotenv import load_dotenv
 17
 18from wxc_sdk.all_types import Event, Schedule, ScheduleType
 19from wxc_sdk.as_api import AsWebexSimpleApi
 20from wxc_sdk.locations import Location
 21
 22log = logging.getLogger(__name__)
 23
 24# a lock per location to protect observe_in_location()
 25location_locks: dict[str, asyncio.Lock] = defaultdict(asyncio.Lock)
 26
 27# Use parallel tasks for provisioning?
 28USE_TASKS = True
 29
 30# True: delete holiday schedule instead of creating one
 31CLEAN_UP = False
 32
 33# first and last year for which to create public holiday events
 34FIRST_YEAR = 2024
 35LAST_YEAR = 2026
 36
 37LAST_YEAR = not CLEAN_UP and LAST_YEAR or FIRST_YEAR
 38
 39
 40async def observe_in_location(*, api: AsWebexSimpleApi, location: Location, holidays: List[Holiday]):
 41    """
 42    create/update a "National Holiday" schedule in one location
 43
 44    :param api: Webex api
 45    :type api: WebexSimpleApi
 46    :param location: location to work on
 47    :type location: Location
 48    :param holidays: list of holidays to observe
 49    :type holidays: List[Holiday]
 50    """
 51    # there should always only one thread messing with the holiday schedule of a location
 52    async with location_locks[location.location_id]:
 53        year = holidays[0].date.year
 54        schedule_name = 'National Holidays'
 55
 56        # shortcut
 57        ats = api.telephony.schedules
 58
 59        # existing "National Holiday" schedule or None
 60        schedule = next((schedule
 61                         for schedule in await ats.list(obj_id=location.location_id,
 62                                                        schedule_type=ScheduleType.holidays,
 63                                                        name=schedule_name)
 64                         if schedule.name == schedule_name),
 65                        None)
 66        if CLEAN_UP:
 67            if schedule:
 68                log.info(f'Delete schedule {schedule.name} in location {schedule.location_name}')
 69                await ats.delete_schedule(obj_id=location.location_id,
 70                                          schedule_type=ScheduleType.holidays,
 71                                          schedule_id=schedule.schedule_id)
 72            return
 73        if schedule:
 74            # we need the details: list response doesn't have events
 75            schedule = await ats.details(obj_id=location.location_id,
 76                                         schedule_type=ScheduleType.holidays,
 77                                         schedule_id=schedule.schedule_id)
 78        # create list of desired schedule entries
 79        #   * one per holiday
 80        #   * only future holidays
 81        #   * not on a Sunday
 82        today = date.today()
 83        events = [Event(name=f'{holiday.name} {holiday.date.year}',
 84                        start_date=holiday.date,
 85                        end_date=holiday.date,
 86                        all_day_enabled=True)
 87                  for holiday in holidays
 88                  if holiday.date >= today and holiday.date.weekday() != 6]
 89
 90        if not schedule:
 91            # create new schedule
 92            log.debug(f'observe_in_location({location.name}, {year}): no existing schedule')
 93            if not events:
 94                log.info(f'observe_in_location({location.name}, {year}): no existing schedule, no events, done')
 95                return
 96            schedule = Schedule(name=schedule_name,
 97                                schedule_type=ScheduleType.holidays,
 98                                events=events)
 99            log.debug(
100                f'observe_in_location({location.name}, {year}): creating schedule "{schedule_name}" with {len(events)} '
101                f'events')
102            schedule_id = await ats.create(obj_id=location.location_id, schedule=schedule)
103            log.info(f'observe_in_location({location.name}, {year}): new schedule id: {schedule_id}, done')
104            return
105
106        # update existing schedule
107        # delete existing events in the past
108        to_delete = [event
109                     for event in schedule.events
110                     if event.start_date < today]
111        if to_delete:
112            log.debug(f'observe_in_location({location.name}, {year}): deleting {len(to_delete)} outdated events')
113            if USE_TASKS:
114                await asyncio.gather(*[ats.event_delete(obj_id=location.location_id,
115                                                        schedule_type=ScheduleType.holidays,
116                                                        schedule_id=schedule.schedule_id,
117                                                        event_id=event.event_id)
118                                       for event in to_delete])
119            else:
120                for event in to_delete:
121                    await ats.event_delete(obj_id=location.location_id,
122                                           schedule_type=ScheduleType.holidays,
123                                           schedule_id=schedule.schedule_id,
124                                           event_id=event.event_id)
125
126        # add events which don't exist yet
127        existing_dates = set(event.start_date
128                             for event in schedule.events)
129        to_add = [event
130                  for event in events
131                  if event.start_date not in existing_dates]
132        if not to_add:
133            log.info(f'observe_in_location({location.name}, {year}): no events to add, done.')
134            return
135        log.debug(f'observe_in_location({location.name}, {year}): creating {len(to_add)} new events.')
136        if USE_TASKS:
137            await asyncio.gather(*[ats.event_create(obj_id=location.location_id,
138                                                    schedule_type=ScheduleType.holidays,
139                                                    schedule_id=schedule.schedule_id,
140                                                    event=event)
141                                   for event in to_add])
142        else:
143            for event in to_add:
144                await ats.event_create(obj_id=location.location_id,
145                                       schedule_type=ScheduleType.holidays,
146                                       schedule_id=schedule.schedule_id,
147                                       event=event)
148        log.info(f'observe_in_location({location.name}, {year}): done.')
149    return
150
151
152async def observe_national_holidays(*, api: AsWebexSimpleApi, locations: List[Location],
153                                    year: int = None):
154    """
155    US national holidays for given locations
156
157    :param api: Webex api
158    :type api: WebexSimpleApi
159    :param locations: list of locations in which US national holidays should be observed
160    :type locations: List[Location]
161    :param year: year for national holidays. Default: current year
162    :type year: int
163    """
164    # default: this year
165    year = year or date.today().year
166
167    # get national holidays for specified year
168    loop = asyncio.get_running_loop()
169    # avoid sync all:
170    # holidays = CalendarifiyApi().holidays(country='US', year=year, holiday_type='national')
171    holidays = await loop.run_in_executor(None, functools.partial(CalendarifiyApi().holidays,
172                                                                  country='US', year=year, holiday_type='national'))
173
174    # update holiday schedule for each location
175    if USE_TASKS:
176        await asyncio.gather(*[observe_in_location(api=api, location=location, holidays=holidays)
177                               for location in locations])
178    else:
179        for location in locations:
180            await observe_in_location(api=api, location=location, holidays=holidays)
181    return
182
183
184if __name__ == '__main__':
185    # read dotenv which has some environment variables like Webex API token and Calendarify
186    # API key.
187    load_dotenv(override=True)
188
189    # enable logging
190    logging.basicConfig(level=logging.DEBUG,
191                        format='%(asctime)s %(levelname)s %(threadName)s %(name)s: %(message)s')
192    logging.getLogger('urllib3').setLevel(logging.INFO)
193    logging.getLogger('wxc_sdk.as_rest').setLevel(logging.INFO)
194
195    # the actual action
196    async def do_provision():
197
198        async with AsWebexSimpleApi(concurrent_requests=5) as wx_api:
199            # get all US locations
200            log.info('Getting locations...')
201            us_locations = [location
202                            for location in await wx_api.locations.list()
203                            if location.address.country == 'US']
204
205            # set up location locks
206            # location_locks is a defaultdict -> accessing with all potential keys creates the locks
207            list(location_locks[loc.location_id] for loc in us_locations)
208
209            # create national holiday schedule for given year(s) and locations
210            if USE_TASKS:
211                await asyncio.gather(*[observe_national_holidays(api=wx_api, year=year, locations=us_locations)
212                                       for year in range(FIRST_YEAR, LAST_YEAR + 1)])
213            else:
214                for year in range(FIRST_YEAR, LAST_YEAR + 1):
215                    await observe_national_holidays(api=wx_api, year=year, locations=us_locations)
216
217    asyncio.run(do_provision())

Persist tokens and obtain new tokens interactively

A typical problem specifically when creating CLI scripts is how to obtain valid access tokens for the API operations. If your code wants to invoke Webex REST APIs on behalf of a user then an integration is needed. The concepts of integrations are explained at the “Integrations” page on developer.cisco.com.

This example code shows how an OAUth Grant flow for an integration can be initiated from a script by using the Python webbrowser module and calling the open() method with the authorization URL of a given integration to open that URL in the system web browser. The user can then authenticate and grant access to the integration. In the last step of a successful authorization flow the web browser is redirected to the redirect_url of the integration.

The example code starts a primitive web server serving GET requests to http://localhost:6001/redirect. This URL has to be the redirect URL of the integration you create under My Webex Apps on developer.webex.com.

The sample script reads the integration parameters from environment variables (TOKEN_INTEGRATION_CLIENT_ID, TOKEN_INTEGRATION_CLIENT_SECRET, TOKEN_INTEGRATION_CLIENT_SCOPES). These variables can also be defined in get_tokens.env in the current directory:

# rename this to get_tokens.env and set values

# client ID for integration to be used.
TOKEN_INTEGRATION_CLIENT_ID=

# client secret of integration
TOKEN_INTEGRATION_CLIENT_SECRET=

# scopes to request. Use these scopes when creating the integration at developer.webex.com
# scopes can be in any form supported by wxc_sdk.scopes.parse_scopes()
TOKEN_INTEGRATION_CLIENT_SCOPES="spark:calls_write spark:kms spark:calls_read spark-admin:telephony_config_read spark:people_read"

The sample code persists the tokens in get_tokens.yml in the current directory. On startup the sample code tries to read tokens from that file. If needed a new access token is obtained using the refresh token.

An OAuth flow is only initiated if no (valid) tokens could be read from get_tokens.yml

Source: get_tokens.py

 1#!/usr/bin/env python
 2"""
 3Example script
 4read tokens from file or interactively obtain token by starting a local web server and open the authorization URL in
 5the local web browser
 6"""
 7import logging
 8import os
 9from typing import Optional
10
11from dotenv import load_dotenv
12
13from wxc_sdk import WebexSimpleApi
14from wxc_sdk.integration import Integration
15from wxc_sdk.scopes import parse_scopes
16from wxc_sdk.tokens import Tokens
17
18log = logging.getLogger(__name__)
19
20
21def env_path() -> str:
22    """
23    determine path for .env to load environment variables from
24
25    :return: .env file path
26    """
27    return os.path.join(os.getcwd(), f'{os.path.splitext(os.path.basename(__file__))[0]}.env')
28
29
30def yml_path() -> str:
31    """
32    determine path of YML file to persist tokens
33
34    :return: path to YML file
35    :rtype: str
36    """
37    return os.path.join(os.getcwd(), f'{os.path.splitext(os.path.basename(__file__))[0]}.yml')
38
39
40def build_integration() -> Integration:
41    """
42    read integration parameters from environment variables and create an integration
43
44    :return: :class:`wxc_sdk.integration.Integration` instance
45    """
46    client_id = os.getenv('TOKEN_INTEGRATION_CLIENT_ID')
47    client_secret = os.getenv('TOKEN_INTEGRATION_CLIENT_SECRET')
48    scopes = parse_scopes(os.getenv('TOKEN_INTEGRATION_CLIENT_SCOPES'))
49    redirect_url = 'http://localhost:6001/redirect'
50    if not all((client_id, client_secret, scopes)):
51        raise ValueError('failed to get integration parameters from environment')
52    return Integration(client_id=client_id, client_secret=client_secret, scopes=scopes,
53                       redirect_url=redirect_url)
54
55
56def get_tokens() -> Optional[Tokens]:
57    """
58
59    Tokens are read from a YML file. If needed an OAuth flow is initiated.
60
61    :return: tokens
62    :rtype: :class:`wxc_sdk.tokens.Tokens`
63    """
64
65    integration = build_integration()
66    tokens = integration.get_cached_tokens_from_yml(yml_path=yml_path())
67    return tokens
68
69
70if __name__ == '__main__':
71    logging.basicConfig(level=logging.DEBUG)
72
73    # load environment variables from .env
74    path = env_path()
75    log.info(f'reading {path}')
76    load_dotenv(env_path())
77
78    tokens = get_tokens()
79
80    # use the tokens to get identity of authenticated user
81    api = WebexSimpleApi(tokens=tokens)
82    me = api.people.me()
83    print(f'authenticated as {me.display_name} ({me.emails[0]}) ')

Read/update call intercept settings of a user

 Usage: call_intercept.py [OPTIONS] USER_EMAIL [ON_OFF]:[on|off]

 read/update call intercept settings of a calling user.

╭─ Arguments ────────────────────────────────────────────────────────────────────────────────╮
│ *    user_email      EMAIL_TYPE         email address of user [required]                   │
│      on_off          [ON_OFF]:[on|off]  operation to apply.                                │
╰────────────────────────────────────────────────────────────────────────────────────────────╯
╭─ Options ──────────────────────────────────────────────────────────────────────────────────╮
│ --token                     TEXT  Access token can be provided using --token argument, set │
│                                   in WEBEX_ACCESS_TOKEN environment variable or can be a   │
│                                   service app token. For the latter set environment        │
│                                   variables ('SERVICE_APP_REFRESH_TOKEN',                  │
│                                   'SERVICE_APP_CLIENT_ID', 'SERVICE_APP_CLIENT_SECRET').   │
│                                   Environment variables can also be set in service_app.env │
│ --install-completion              Install completion for the current shell.                │
│ --show-completion                 Show completion for the current shell, to copy it or     │
│                                   customize the installation.                              │
│ --help                            Show this message and exit.                              │
╰────────────────────────────────────────────────────────────────────────────────────────────╯

 Example: ./call_intercept.py bob@example.com on

Source: call_intercept.py

  1#!/usr/bin/env -S uv run --script
  2# /// script
  3# requires-python = ">=3.11,<3.14"
  4# dependencies = [
  5#     "email-validator",
  6#     "python-dotenv",
  7#     "typer",
  8#     "wxc-sdk",
  9# ]
 10# ///
 11"""
 12Script to read/update call intercept settings of a calling user.
 13
 14 Usage: call_intercept.py [OPTIONS] USER_EMAIL [ON_OFF]:[on|off]
 15
 16 read/update call intercept settings of a calling user.
 17
 18╭─ Arguments ────────────────────────────────────────────────────────────────────────────────╮
 19│ *    user_email      EMAIL_TYPE         email address of user [required]                   │
 20│      on_off          [ON_OFF]:[on|off]  operation to apply.                                │
 21╰────────────────────────────────────────────────────────────────────────────────────────────╯
 22╭─ Options ──────────────────────────────────────────────────────────────────────────────────╮
 23│ --token                     TEXT  Access token can be provided using --token argument, set │
 24│                                   in WEBEX_ACCESS_TOKEN environment variable or can be a   │
 25│                                   service app token. For the latter set environment        │
 26│                                   variables ('SERVICE_APP_REFRESH_TOKEN',                  │
 27│                                   'SERVICE_APP_CLIENT_ID', 'SERVICE_APP_CLIENT_SECRET').   │
 28│                                   Environment variables can also be set in service_app.env │
 29│ --install-completion              Install completion for the current shell.                │
 30│ --show-completion                 Show completion for the current shell, to copy it or     │
 31│                                   customize the installation.                              │
 32│ --help                            Show this message and exit.                              │
 33╰────────────────────────────────────────────────────────────────────────────────────────────╯
 34
 35 Example: ./call_intercept.py bob@example.com on
 36"""
 37import logging
 38import os
 39import sys
 40from enum import Enum
 41from typing import Optional
 42
 43import typer
 44from dotenv import load_dotenv
 45from email_validator import validate_email
 46from service_app import SERVICE_APP_ENVS, env_path, get_tokens
 47
 48from wxc_sdk import WebexSimpleApi
 49from wxc_sdk.person_settings.call_intercept import InterceptSetting
 50from wxc_sdk.rest import RestError
 51from wxc_sdk.tokens import Tokens
 52
 53log = logging.getLogger(__name__)
 54
 55
 56def email_type(value: str):
 57    """
 58    Validate email address
 59    """
 60    validated = validate_email(value, check_deliverability=False)
 61    return validated.normalized
 62
 63
 64class OnOff(str, Enum):
 65    ON = 'on'
 66    OFF = 'off'
 67
 68
 69app = typer.Typer()
 70
 71
 72@app.command(name=os.path.basename(__file__),
 73             help='read/update call intercept settings of a calling user.',
 74             epilog=f'Example: {sys.argv[0]} bob@example.com on')
 75def main(user_email: str = typer.Argument(...,
 76                                          help='email address of user',
 77                                          parser=email_type),
 78         on_off: Optional[OnOff] = typer.Argument(None,
 79                                                  help='operation to apply.'),
 80         token: Optional[str] = typer.Option(None, '--token',
 81                                             help=f'Access token can be provided using --token argument, '
 82                                                  f'set in WEBEX_ACCESS_TOKEN environment variable or '
 83                                                  f'can be a service app token. For the latter set '
 84                                                  f'environment variables {SERVICE_APP_ENVS}. '
 85                                                  f'Environment variables can also be set in '
 86                                                  f'{env_path()}')):
 87    """
 88    where the magic happens
 89    """
 90    # get tokens
 91    load_dotenv(env_path(), override=True)
 92
 93    tokens = get_tokens() if token is None else Tokens(access_token=token)
 94    if tokens is None:
 95        print(f'Access token can be provided using --token argument, set in WEBEX_ACCESS_TOKEN environment variable or '
 96              f'can be a service app token. For the latter set environment variables {SERVICE_APP_ENVS}. Environment '
 97              f'variables can '
 98              f'also be set in {env_path()}', file=sys.stderr)
 99
100    # set level to DEBUG to see debug of REST requests
101    logging.basicConfig(level=(gt := getattr(sys, 'gettrace', None)) and gt() and logging.DEBUG or logging.INFO)
102
103    with WebexSimpleApi(tokens=tokens) as api:
104        # get user
105        email = user_email.lower()
106        user = next((user
107                     for user in api.people.list(email=email)
108                     if user.emails[0] == email), None)
109        if user is None:
110            print(f'User "{email}" not found', file=sys.stderr)
111            exit(1)
112
113        # display call intercept status
114        try:
115            intercept = api.person_settings.call_intercept.read(user.person_id)
116        except RestError as e:
117            print(f'Failed to read call intercept settings: {e.response.status_code}, {e.description}')
118            exit(1)
119
120        print('on' if intercept.enabled else 'off')
121        if on_off:
122            # action: turn on/off
123            intercept = InterceptSetting.default()
124            intercept.enabled = on_off == OnOff.ON
125            try:
126                api.person_settings.call_intercept.configure(user.person_id,
127                                                             intercept=intercept)
128            except RestError as e:
129                print(f'Failed to update call intercept settings: {e.response.status_code}, {e.description}')
130                exit(1)
131
132            # read call intercept again
133            try:
134                intercept = api.person_settings.call_intercept.read(user.person_id)
135            except RestError as e:
136                print(f'Failed to read call intercept settings: {e.response.status_code}, {e.description}')
137                exit(1)
138
139            # display state after update
140            print(f"set to {'on' if intercept.enabled else 'off'}")
141
142    exit(0)
143
144
145if __name__ == '__main__':
146    app()

Read/update call queue agent join states

usage: queue_helper.py [-h] [--location LOCATION [LOCATION ...]]
                       [--queue QUEUE [QUEUE ...]]
                       [--join JOIN_AGENT [JOIN_AGENT ...]]
                       [--unjoin UNJOIN_AGENT [UNJOIN_AGENT ...]]
                       [--remove REMOVE_USER [REMOVE_USER ...]]
                       [--add ADD_USER [ADD_USER ...]] [--dryrun]
                       [--token TOKEN]

Modify call queue settings from the CLI

optional arguments:
  -h, --help            show this help message and exit
  --location LOCATION [LOCATION ...], -l LOCATION [LOCATION ...]
                        name of location to work on. If missing then work on
                        all locations.
  --queue QUEUE [QUEUE ...], -q QUEUE [QUEUE ...]
                        name(s) of queue(s) to operate on. If missing then
                        work on all queues in location.
  --join JOIN_AGENT [JOIN_AGENT ...], -j JOIN_AGENT [JOIN_AGENT ...]
                        Join given user(s) on given queue(s). Can be "all" to
                        act on all agents.
  --unjoin UNJOIN_AGENT [UNJOIN_AGENT ...], -u UNJOIN_AGENT [UNJOIN_AGENT ...]
                        Unjoin given agent(s) from given queue(s). Can be
                        "all" to act on all agents.
  --remove REMOVE_USER [REMOVE_USER ...], -r REMOVE_USER [REMOVE_USER ...]
                        Remove given agent from given queue(s). Can be "all"
                        to act on all agents.
  --add ADD_USER [ADD_USER ...], -a ADD_USER [ADD_USER ...]
                        Add given users to given queue(s).
  --dryrun, -d          Dry run; don't apply any changes
  --token TOKEN         admin access token to use

The script uses the access token passed via the CLI, reads one from the WEBEX_ACCESS_TOKEN environment variable or obtains tokens via an OAuth flow. For the last option the integration parameters are read from environment variables which can be set in a queue_helper.env file in the current directory.

Source: queue_helper.py

  1#!/usr/bin/env python
  2"""
  3usage: queue_helper.py [-h] [--location LOCATION [LOCATION ...]] [--queue QUEUE [QUEUE ...]]
  4                       [--join JOIN_AGENT [JOIN_AGENT ...]] [--unjoin UNJOIN_AGENT [UNJOIN_AGENT ...]]
  5                       [--remove REMOVE_USER [REMOVE_USER ...]] [--add ADD_USER [ADD_USER ...]] [--dryrun]
  6                       [--token TOKEN]
  7
  8Modify call queue settings from the CLI
  9
 10optional arguments:
 11  -h, --help            show this help message and exit
 12  --location LOCATION [LOCATION ...], -l LOCATION [LOCATION ...]
 13                        name of location to work on. If missing then work on all locations.
 14  --queue QUEUE [QUEUE ...], -q QUEUE [QUEUE ...]
 15                        name(s) of queue(s) to operate on. If missing then work on all queues in location.
 16  --join JOIN_AGENT [JOIN_AGENT ...], -j JOIN_AGENT [JOIN_AGENT ...]
 17                        Join given user(s) on given queue(s). Can be "all" to act on all agents.
 18  --unjoin UNJOIN_AGENT [UNJOIN_AGENT ...], -u UNJOIN_AGENT [UNJOIN_AGENT ...]
 19                        Unjoin given agent(s) from given queue(s). Can be "all" to act on all agents.
 20  --remove REMOVE_USER [REMOVE_USER ...], -r REMOVE_USER [REMOVE_USER ...]
 21                        Remove given agent from given queue(s). Can be "all" to act on all agents.
 22  --add ADD_USER [ADD_USER ...], -a ADD_USER [ADD_USER ...]
 23                        Add given users to given queue(s).
 24  --dryrun, -d          Dry run; don't apply any changes
 25  --token TOKEN         admin access token to use
 26"""
 27import asyncio
 28import logging
 29import os
 30import sys
 31from argparse import ArgumentParser
 32from collections.abc import AsyncGenerator
 33from typing import Optional
 34
 35from dotenv import load_dotenv
 36
 37from wxc_sdk.as_api import AsWebexSimpleApi
 38from wxc_sdk.integration import Integration
 39from wxc_sdk.people import Person
 40from wxc_sdk.scopes import parse_scopes
 41from wxc_sdk.telephony.callqueue import CallQueue
 42from wxc_sdk.telephony.hg_and_cq import Agent
 43from wxc_sdk.tokens import Tokens
 44
 45
 46def agent_name(agent: Agent) -> str:
 47    return f'{agent.first_name} {agent.last_name}'
 48
 49
 50def env_path() -> str:
 51    """
 52    determine path for .env to load environment variables from; based on name of this file
 53    :return: .env file path
 54    """
 55    return f'{os.path.splitext(__file__)[0]}.env'
 56
 57
 58def yml_path() -> str:
 59    """
 60    determine path of YML file to persist tokens
 61    :return: path to YML file
 62    :rtype: str
 63    """
 64    return f'{os.path.splitext(__file__)[0]}.yml'
 65
 66
 67def build_integration() -> Integration:
 68    """
 69    read integration parameters from environment variables and create an integration
 70    :return: :class:`wxc_sdk.integration.Integration` instance
 71    """
 72    client_id = os.getenv('INTEGRATION_CLIENT_ID')
 73    client_secret = os.getenv('INTEGRATION_CLIENT_SECRET')
 74    scopes = parse_scopes(os.getenv('INTEGRATION_SCOPES'))
 75    redirect_url = 'http://localhost:6001/redirect'
 76    if not all((client_id, client_secret, scopes)):
 77        raise ValueError('failed to get integration parameters from environment')
 78    return Integration(client_id=client_id, client_secret=client_secret, scopes=scopes,
 79                       redirect_url=redirect_url)
 80
 81
 82def get_tokens() -> Optional[Tokens]:
 83    """
 84    Tokens are read from a YML file. If needed an OAuth flow is initiated.
 85
 86    :return: tokens
 87    :rtype: :class:`wxc_sdk.tokens.Tokens`
 88    """
 89
 90    integration = build_integration()
 91    tokens = integration.get_cached_tokens_from_yml(yml_path=yml_path())
 92    return tokens
 93
 94
 95async def main():
 96    async def act_on_queue(queue: CallQueue):
 97        """
 98        Act on a single queue
 99        """
100        # we need the queue details b/c the queue instance passed as parameter is from a list() call
101        # ... and thus is missing all the details like agents
102        details = await api.telephony.callqueue.details(location_id=queue.location_id, queue_id=queue.id)
103        agent_names = set(map(agent_name, details.agents))
104
105        def notify(message: str) -> str:
106            """
107            an action notification with queue information
108            """
109            return f'queue "{details.name:{queue_len}}" in "{queue.location_name:{location_len}}": {message}'
110
111        def validate_agents(names: list[str], operation: str) -> list[str]:
112            """
113            check if all names in given list exist as agents on current queue
114            """
115            if 'all' in names:
116                return set(agent_names)
117
118            not_found = [name for name in names if name not in agent_names]
119            if not_found:
120                print('\n'.join(notify(f'{name} not found for {operation}"')
121                                for name in not_found),
122                      file=sys.stderr)
123            return set(name for name in names if name not in set(not_found))
124
125        # validate list of names or join, unjoin, and remove against actual list of agents
126        to_join = validate_agents(join_agents, 'join')
127        to_unjoin = validate_agents(unjoin_agents, 'unjoin')
128        to_remove = validate_agents(remove_users, 'remove')
129
130        # check for agents we are asked to add but which already exist as agents on the queue
131        existing_agent_ids = set(agent.agent_id for agent in details.agents)
132        agent_exists = [agent_name(user) for user in add_users
133                        if user.person_id in existing_agent_ids]
134        if agent_exists:
135            print('\n'.join(notify(f'{name} already is agent')
136                            for name in agent_exists),
137                  file=sys.stderr)
138            # reduced set of users to add
139            to_add = [user for user in add_users
140                      if agent_name(user) not in set(agent_exists)]
141        else:
142            # ...  or add all users
143            to_add = add_users
144
145        # the updated list of agents for the current queue
146        new_agents = []
147
148        # do we actually need an update?
149        update_needed = False
150
151        # create copy of each agent instance; we don't want to update the original agent objects
152        # to make sure that details still holds the state before any update
153        agents = [agent.copy(deep=True) for agent in details.agents]
154
155        # iterate through the existing agents and see if we have to apply any change
156        for agent in agents:
157            name = agent_name(agent)
158            # do we have to take action to join this agent?
159            if name in to_join and not agent.join_enabled:
160                print(notify(f'{name}, join'))
161                update_needed = True
162                agent.join_enabled = True
163            # do we have to take action to unjoin this agent?
164            if name in to_unjoin and agent.join_enabled:
165                print(notify(f'{name}, unjoin'))
166                update_needed = True
167                agent.join_enabled = False
168            # do we have to remove this agent?
169            if name in to_remove:
170                print(notify(f'{name}, remove'))
171                update_needed = True
172                # skip to next agent; so that we don't add this agent to the updated list of agents
173                continue
174            new_agents.append(agent)
175
176        # add new agents
177        new_agents.extend(Agent(id=user.person_id)
178                          for user in to_add)
179
180        # update the queue
181        if (update_needed or to_add) and not args.dryrun:
182            # simplified update: we only messed with the agents
183            update = CallQueue(agents=new_agents)
184            await api.telephony.callqueue.update(location_id=queue.location_id, queue_id=queue.id,
185                                                 update=update)
186            print(notify('queue updated'))
187            # and get details after the update
188            details = await api.telephony.callqueue.details(location_id=queue.location_id, queue_id=queue.id)
189            print(notify('got details after update'))
190
191        # print summary
192        print(f'queue "{queue.name:{queue_len}}" in "{queue.location_name}"')
193        print(f'  phone number: {details.phone_number}')
194        print(f'  extension: {details.extension}')
195        print('  agents')
196        if details.agents:
197            name_len = max(map(len, map(agent_name, details.agents)))
198            for agent in details.agents:
199                print(f'    {agent_name(agent):{name_len}}: {"not " if not agent.join_enabled else ""}joined')
200        return
201
202    async def validate_users(user_names: list[str]) -> AsyncGenerator[Person, None, None]:
203        """
204        Validate list of names of users to be added and yield a Person instance for each one
205        """
206        # search for all names in parallel
207        lists: list[list[Person]] = await asyncio.gather(
208            *[api.people.list(display_name=name) for name in user_names], return_exceptions=True)
209        for name, user_list in zip(user_names, lists):
210            if isinstance(user_list, Exception):
211                user = None
212            else:
213                user = next((u for u in user_list if name == agent_name(u)), None)
214            if user is None:
215                print(f'user "{name}" not found', file=sys.stderr)
216                continue
217            yield user
218        return
219
220    # parse command line
221    parser = ArgumentParser(description='Modify call queue settings from the CLI')
222    parser.add_argument('--location', '-l', type=str, required=False, nargs='+',
223                        help='name of location to work on. If missing then work on all locations.')
224
225    parser.add_argument('--queue', '-q', type=str, required=False, nargs='+',
226                        help='name(s) of queue(s) to operate on. If missing then work on all queues in location.')
227
228    parser.add_argument('--join', '-j', type=str, required=False, nargs='+', dest='join_agent',
229                        help='Join given user(s) on given queue(s). Can be "all" to act on all agents.')
230
231    parser.add_argument('--unjoin', '-u', type=str, required=False, nargs='+', dest='unjoin_agent',
232                        help='Unjoin given agent(s) from given queue(s). Can be "all" to act on all agents.')
233
234    parser.add_argument('--remove', '-r', type=str, required=False, nargs='+', dest='remove_user',
235                        help='Remove given agent from given queue(s). Can be "all" to act on all agents.')
236
237    parser.add_argument('--add', '-a', type=str, required=False, nargs='+', dest='add_user',
238                        help='Add given users to given queue(s).')
239    parser.add_argument('--dryrun', '-d', required=False, action='store_true',
240                        help='Dry run; don\'t apply any changes')
241    parser.add_argument('--token', type=str, required=False, help='admin access token to use')
242
243    args = parser.parse_args()
244
245    # get environment variables from .env; required for integration parameters
246    load_dotenv(env_path())
247
248    tokens = args.token or None
249    if tokens is None:
250        # get tokens from cache or create a new set of tokens using the integration defined in .env
251        tokens = get_tokens()
252
253    async with AsWebexSimpleApi(tokens=tokens) as api:
254        # validate location parameter
255        location_names = args.location or []
256
257        # list of all locations with names matching one of the provided names
258        locations = [loc for loc in await api.locations.list()
259                     if not location_names or loc.name in set(location_names)]
260
261        if not location_names:
262            print(f'Considering all {len(locations)} locations')
263
264        # set of names of matching locations
265        found_location_names = set(loc.name for loc in locations)
266
267        # Error message for each location name argument not matching an actual location
268        for location_name in location_names:
269            if location_name not in found_location_names:
270                print(f'location "{location_name}" not found', file=sys.stderr)
271
272        if not locations:
273            print('Found no locations to work on', file=sys.stderr)
274            exit(1)
275
276        # which queues do we need to operate on?
277        location_ids = set(loc.location_id for loc in locations)
278        queue_names = args.queue
279        all_queues = queue_names is None
280        # full list of queues
281        queues = await api.telephony.callqueue.list()
282        # filter based on location parameter
283        queues = [queue for queue in queues
284                  if (all_queues or queue.name in queue_names) and queue.location_id in location_ids]
285
286        # len of queue names for nicer output
287        queue_len = max(len(queue.name) for queue in queues)
288
289        # now we can actually go back and re-evaluate the list of locations; for the location length we only need
290        # to consider locations we actually have a target queue in
291        location_ids = set(queue.location_id for queue in queues)
292
293        # max length of location names for nicely formatted output
294        location_len = max(len(loc.name)
295                           for loc in locations
296                           if loc.location_id in location_ids)
297
298        # get the names for join, unjoin, remove, and add
299        join_agents = args.join_agent or []
300        unjoin_agents = args.unjoin_agent or []
301        remove_users = args.remove_user or []
302        add_users = args.add_user or []
303
304        # validate users; make sure that users exist with the provided names
305        add_users = [u async for u in validate_users(user_names=add_users)]
306
307        # apply actions to all queues
308        await asyncio.gather(*[act_on_queue(queue) for queue in queues])
309
310
311if __name__ == '__main__':
312    # enable DEBUG logging to a file; REST log shows all requests
313    logging.basicConfig(filename=os.path.join(os.getcwd(), f'{os.path.splitext(os.path.basename(__file__))[0]}.log'),
314                        filemode='w', level=logging.DEBUG)
315    asyncio.run(main())

Using service APP tokens to access a API endpoints

The script uses service app credentials to get an access token and then use this access token to call Webex Calling APIs.

Source: service_app.py

  1#!/usr/bin/env -S uv run --script
  2# /// script
  3# requires-python = ">=3.11,<3.14"
  4# dependencies = [
  5#     "python-dotenv",
  6#     "wxc-sdk",
  7# ]
  8# ///
  9"""
 10Demo for Webex service app: using service APP tokens to access a API endpoints
 11"""
 12
 13import inspect
 14import logging
 15import os
 16import sys
 17from json import dumps, loads
 18from os import getenv
 19from typing import Optional
 20
 21import yaml
 22from dotenv import load_dotenv
 23
 24from wxc_sdk import WebexSimpleApi
 25from wxc_sdk.integration import Integration
 26from wxc_sdk.tokens import Tokens
 27
 28SERVICE_APP_ENVS = ('SERVICE_APP_REFRESH_TOKEN', 'SERVICE_APP_CLIENT_ID', 'SERVICE_APP_CLIENT_SECRET')
 29
 30
 31def yml_path(*, client_id: str) -> str:
 32    """
 33    Get filename for YML file to cache access and refresh token
 34    """
 35    return f'tokens_{client_id}.yml'
 36
 37
 38def env_path() -> str:
 39    """
 40    Get path to .env file to read service app settings from
 41    """
 42    # get the file name of the calling script to determine the name of the .env file
 43    frame = inspect.currentframe().f_back
 44    file_name = inspect.getframeinfo(frame).filename
 45    env_name = f'{os.path.splitext(os.path.basename(file_name))[0]}.env'
 46    if not os.path.isfile(env_name):
 47        # fallback to .env specific to this file
 48        env_name = f'{os.path.splitext(os.path.basename(__file__))[0]}.env'
 49    return env_name
 50
 51
 52def read_tokens_from_file(*, client_id: str) -> Optional[Tokens]:
 53    """
 54    Get service app tokens from cache file, return None if cache does not exist or read fails
 55    """
 56    path = yml_path(client_id=client_id)
 57    if not os.path.isfile(path):
 58        return None
 59    try:
 60        with open(path) as f:
 61            data = yaml.safe_load(f)
 62        tokens = Tokens.model_validate(data)
 63    except Exception:
 64        return None
 65    return tokens
 66
 67
 68def write_tokens_to_file(*, client_id: str, tokens: Tokens):
 69    """
 70    Write tokens to cache
 71    """
 72    with open(yml_path(client_id=client_id), mode='w') as f:
 73        yaml.safe_dump(tokens.model_dump(exclude_none=True), f)
 74
 75
 76def get_access_token(*, client_id: str, client_secret: str, refresh: str) -> Tokens:
 77    """
 78    Get a new access token using refresh token, service app client id, service app client secret
 79    """
 80    tokens = Tokens(refresh_token=refresh)
 81    integration = Integration(client_id=client_id, client_secret=client_secret, scopes=[], redirect_url=None)
 82    integration.refresh(tokens=tokens)
 83    write_tokens_to_file(client_id=client_id, tokens=tokens)
 84    return tokens
 85
 86
 87def get_tokens() -> Optional[Tokens]:
 88    """
 89    Get tokens from environment variable, cache or create new access token using service app credentials
 90    """
 91    refresh, client_id, client_secret = (os.getenv(env) for env in SERVICE_APP_ENVS)
 92    if not all((refresh, client_id, client_secret)):
 93        token = os.getenv('WEBEX_ACCESS_TOKEN')
 94        if token is None:
 95            return None
 96        tokens = Tokens(access_token=token)
 97    else:
 98        # try to read from file
 99        tokens = read_tokens_from_file(client_id=client_id)
100        # .. or create new access token using refresh token
101        if tokens is None:
102            tokens = get_access_token(client_id=client_id, client_secret=client_secret, refresh=refresh)
103        if tokens.expires_in is not None and tokens.remaining < 24 * 60 * 60:
104            tokens = get_access_token(client_id=client_id, client_secret=client_secret, refresh=refresh)
105    return tokens
106
107
108def service_app():
109    """
110    Use service app access token to call Webex Calling API endpoints
111    """
112    load_dotenv(env_path())
113    # assert that all required environment variable are set
114    if not all(getenv(s) for s in SERVICE_APP_ENVS):
115        print(
116            f'{", ".join(SERVICE_APP_ENVS)} environment variables need to be defined in '
117            f'environment or in "{env_path()}"',
118            file=sys.stderr,
119        )
120        exit(1)
121
122    # get tokens and dump to console
123    tokens = get_tokens()
124
125    print(dumps(loads(tokens.json()), indent=2))
126    print()
127    print('scopes:')
128    print('\n'.join(f' * {s}' for s in sorted(tokens.scope.split())))
129
130    # use tokens to access APIs
131    with WebexSimpleApi(tokens=tokens) as api:
132        users = list(api.people.list())
133        print(f'{len(users)} users')
134
135        queues = list(api.telephony.callqueue.list())
136        print(f'{len(queues)} call queues')
137    return
138
139
140if __name__ == '__main__':
141    logging.basicConfig(level=logging.DEBUG)
142    service_app()

Pool unassigned TNs on hunt groups to catch calls to unassigned TNs

This script looks for unassigned TNs and assigns them to HGs that are forwarded to the locations main number. The idea is to catch all incoming calls to unassigned TNs and handle them accordingly.

usage: catch_tns.py [-h] [--test] [--location LOCATION] [--token TOKEN]
                    [--cleanup]

optional arguments:
  -h, --help           show this help message and exit
  --test               test only; don't actually apply any config
  --location LOCATION  Location to work on
  --token TOKEN        admin access token to use.
  --cleanup            remove all pooling HGs

Source: catch_tns.py

  1#!/usr/bin/env python
  2"""
  3This script looks for unassigned TNs and assigns them to HGs that are forwarded to the locations main number.
  4The idea is to catch all incoming calls to unassigned TNs and handle them accordingly
  5
  6    usage: catch_tns.py [-h] [--test] [--location LOCATION] [--token TOKEN]
  7                        [--cleanup]
  8
  9    optional arguments:
 10      -h, --help           show this help message and exit
 11      --test               test only; don't actually apply any config
 12      --location LOCATION  Location to work on
 13      --token TOKEN        admin access token to use.
 14      --cleanup            remove all pooling HGs
 15"""
 16import asyncio
 17import logging
 18import os
 19import sys
 20from argparse import ArgumentParser, Namespace
 21from collections import defaultdict
 22from operator import attrgetter
 23from typing import Optional, Union
 24
 25from dotenv import load_dotenv
 26
 27from wxc_sdk.as_api import AsWebexSimpleApi
 28from wxc_sdk.common import AlternateNumber, IdAndName, RingPattern
 29from wxc_sdk.integration import Integration
 30from wxc_sdk.scopes import parse_scopes
 31from wxc_sdk.telephony import NumberListPhoneNumber
 32from wxc_sdk.telephony.forwarding import CallForwarding
 33from wxc_sdk.telephony.huntgroup import HuntGroup
 34from wxc_sdk.tokens import Tokens
 35
 36POOL_HG_NAME = 'POOL_'
 37
 38
 39def env_path() -> str:
 40    """
 41    determine path for .env to load environment variables from; based on name of this file
 42    :return: .env file path
 43    """
 44    return os.path.join(os.getcwd(), f'{os.path.splitext(os.path.basename(__file__))[0]}.env')
 45
 46
 47def yml_path() -> str:
 48    """
 49    determine path of YML file to persist tokens
 50    :return: path to YML file
 51    :rtype: str
 52    """
 53    return os.path.join(os.getcwd(), f'{os.path.splitext(os.path.basename(__file__))[0]}.yml')
 54
 55
 56def build_integration() -> Integration:
 57    """
 58    read integration parameters from environment variables and create an integration
 59    :return: :class:`wxc_sdk.integration.Integration` instance
 60    """
 61    client_id = os.getenv('INTEGRATION_CLIENT_ID')
 62    client_secret = os.getenv('INTEGRATION_CLIENT_SECRET')
 63    scopes = parse_scopes(os.getenv('INTEGRATION_SCOPES'))
 64    redirect_url = 'http://localhost:6001/redirect'
 65    if not all((client_id, client_secret, scopes)):
 66        raise ValueError('failed to get integration parameters from environment')
 67    return Integration(client_id=client_id, client_secret=client_secret, scopes=scopes,
 68                       redirect_url=redirect_url)
 69
 70
 71def get_tokens() -> Optional[Tokens]:
 72    """
 73    Tokens are read from a YML file. If needed an OAuth flow is initiated.
 74
 75    :return: tokens
 76    :rtype: :class:`wxc_sdk.tokens.Tokens`
 77    """
 78
 79    integration = build_integration()
 80    tokens = integration.get_cached_tokens_from_yml(yml_path=yml_path())
 81    return tokens
 82
 83
 84async def location_id_from_args(api: AsWebexSimpleApi, args: Namespace) -> Optional[str]:
 85    """
 86    Get location id for --location parameter
 87    """
 88    if not args.location:
 89        return None
 90    location = next((loc
 91                     for loc in await api.locations.list(name=args.location)
 92                     if loc.name == args.location),
 93                    None)
 94    return location and location.location_id
 95
 96
 97async def cleanup(api: AsWebexSimpleApi, args: Namespace):
 98    """
 99    clean up (delete) pooling HGs
100    """
101    location_id = await location_id_from_args(api, args)
102    existing_hg_list = [hg for hg in await api.telephony.huntgroup.list(location_id=location_id)
103                        if hg.name.startswith(POOL_HG_NAME)]
104
105    async def delete_one(hg: HuntGroup):
106        if not args.test:
107            await api.telephony.huntgroup.delete_huntgroup(location_id=hg.location_id,
108                                                           huntgroup_id=hg.id)
109        print(f'Deleted HG "{hg.name}" in location "{hg.location_name}"')
110
111    await asyncio.gather(*[delete_one(hg)
112                           for hg in existing_hg_list])
113
114
115async def pool_tns_location(api: AsWebexSimpleApi, args: Namespace, location: IdAndName,
116                            tns: list[NumberListPhoneNumber]):
117    """
118    Pool unassigned TNs for a given location
119    """
120
121    async def add_to_hg(hg: Union[HuntGroup, str],
122                        tns: list[NumberListPhoneNumber]):
123        """
124        Add a bunch of TNs to given HG
125        :param hg: can be an existing HG or a name of a HG to be created
126        :param tns: list of TNs to be added to the HG
127        """
128        if isinstance(hg, str):
129            # create new HG
130            print(f'Creating HG "{hg}" in location "{location.name}" for: '
131                  f'{", ".join(tn.phone_number for tn in tns)}')
132            if args.test:
133                return
134            settings = HuntGroup.create(name=hg,
135                                        phone_number=tns[0].phone_number)
136            new_id = await api.telephony.huntgroup.create(location_id=location.id,
137                                                          settings=settings)
138            # Get details of new HG and also the forwarding settings
139            # * details are needed for the recursive call to add_to_hg .. in case we have more than one TN to add
140            # * ... and forwarding settings are needed b/c we want to set CFwdAll to location's main number
141            details, forwarding = await asyncio.gather(
142                api.telephony.huntgroup.details(location_id=location.id, huntgroup_id=new_id),
143                api.telephony.huntgroup.forwarding.settings(location_id=location.id, feature_id=new_id))
144            forwarding: CallForwarding
145            details: HuntGroup
146
147            # set call forwarding
148            print(f'HG "{hg}" in location "{location.name}": set CFwdAll to {main_number}')
149            forwarding.always.enabled = True
150            forwarding.always.destination = main_number
151            await api.telephony.huntgroup.forwarding.update(location_id=location.id, feature_id=new_id,
152                                                            forwarding=forwarding)
153            if len(tns) > 1:
154                # add the remaining as alternate numbers
155                await add_to_hg(hg=details, tns=tns[1:])
156            return
157
158        # add tns as alternate numbers
159        print(f'HG "{hg.name}" in location "{location.name}", adding: '
160              f'{", ".join(tn.phone_number for tn in tns)}')
161        if args.test:
162            return
163        # weirdly on GET alternate numbers are returned in alternate_number_settings ...
164        alternate_numbers = hg.alternate_number_settings.alternate_numbers
165        alternate_numbers.extend(AlternateNumber(phone_number=tn.phone_number,
166                                                 ring_pattern=RingPattern.normal)
167                                 for tn in tns)
168        # ... while for an update Wx expects the alternate numbers in an alternate_number attribute
169        update = HuntGroup(alternate_numbers=alternate_numbers)
170        await api.telephony.huntgroup.update(location_id=location.id,
171                                             huntgroup_id=hg.id,
172                                             update=update)
173
174        return
175
176    # if the list of available TNs includes the main number then something is wonky. The main number should be owned
177    # by something
178    main_number = next((tn for tn in tns if tn.main_number), None)
179    if main_number is not None:
180        print(f'Error: main number {main_number.phone_number} in location "{location.name}" is not assigned to '
181              f'anything!', file=sys.stderr)
182        return
183
184    # get main number of location
185    main_number = (await api.telephony.location.details(location_id=location.id)).calling_line_id.phone_number
186
187    # get list if "pool" HGs in location
188    existing_hg_list = [hg for hg in await api.telephony.huntgroup.list(location_id=location.id)
189                        if hg.name.startswith(POOL_HG_NAME)]
190
191    # we need the details for all HGs: list() response is missing the alternate number list
192    # get all HG details in parallel
193    existing_hg_list = await asyncio.gather(*[api.telephony.huntgroup.details(location_id=location.id,
194                                                                              huntgroup_id=hg.id)
195                                              for hg in existing_hg_list])
196    existing_hg_list: list[HuntGroup]
197
198    # start with an empty list of tasks
199    tasks = []
200
201    # assign TNs to existing "pool" HGs
202
203    # these are the HGs we can still assign some TNs to
204    hgs_with_open_slots = (hg
205                           for hg in existing_hg_list
206                           if len(hg.alternate_number_settings.alternate_numbers) < 10)
207    tns.sort(key=attrgetter('phone_number'))
208    while tns:
209        hg_with_open_slots = next(hgs_with_open_slots, None)
210        if hg_with_open_slots is None:
211            # no more HGs we can assign TNs to --> we are done here
212            break
213
214        # add some tns to this hg; hg can have max 10 alternate numbers
215        tns_to_add = 10 - len(hg_with_open_slots.alternate_number_settings.alternate_numbers)
216        tasks.append(add_to_hg(hg=hg_with_open_slots, tns=tns[:tns_to_add]))
217
218        # continue w/ remaining TNs
219        tns = tns[tns_to_add:]
220
221    # assign remaining TNs to new "pool" HGs
222    # .. in batches of 11
223    existing_names = set(hg.name for hg in existing_hg_list)
224    new_hg_names = (name
225                    for i in range(1, 1000)
226                    if (name := f'{POOL_HG_NAME}{i:03d}') not in existing_names)
227    while tns:
228        tasks.append(add_to_hg(hg=next(new_hg_names), tns=tns[:11]))
229        tns = tns[11:]
230
231    # Now run all tasks
232    await asyncio.gather(*tasks)
233    return
234
235
236async def pool_tns(api: AsWebexSimpleApi, args: Namespace):
237    """
238    Assign unassigned numbers to pool HGs
239    """
240    location_id = await location_id_from_args(api, args)
241
242    # get available TNs. If a location argument was present then limit to that location
243    numbers = await api.telephony.phone_numbers(available=True, location_id=location_id)
244
245    # we need to work on TNs by location ...
246    tns_by_location: dict[str, list[NumberListPhoneNumber]] = defaultdict(list)
247    # ... and we want to collect location information (specifically the name)
248    locations: dict[str, IdAndName] = dict()
249    for tn in numbers:
250        locations[tn.location.id] = tn.location
251        tns_by_location[tn.location.id].append(tn)
252
253    # work on all locations in parallel
254    await asyncio.gather(*[pool_tns_location(api=api, args=args,
255                                             location=locations[location_id], tns=tns)
256                           for location_id, tns in tns_by_location.items()])
257
258
259async def catch_tns():
260    """
261    Main async logic
262    """
263    parser = ArgumentParser()
264    parser.add_argument('--test', required=False, help='test only; don\'t actually apply any config',
265                        action='store_true')
266    parser.add_argument('--location', required=False, help='Location to work on', type=str)
267    parser.add_argument('--token', type=str, required=False, help='admin access token to use.')
268    parser.add_argument('--cleanup', required=False, help='remove all pooling HGs', action='store_true')
269    args = parser.parse_args()
270
271    load_dotenv(env_path(), override=True)
272
273    tokens = args.token or None
274
275    if tokens is None:
276        # get tokens from cache or create a new set of tokens using the integration defined in .env
277        tokens = get_tokens()
278    async with AsWebexSimpleApi(tokens=tokens) as api:
279        if args.cleanup:
280            await cleanup(api, args)
281        else:
282            await pool_tns(api, args)
283
284
285if __name__ == '__main__':
286    logging.basicConfig(level=logging.INFO)
287    asyncio.run(catch_tns())

Downgrade room device workspaces from Webex Calling to free calling

This script looks for workspaces in a given location (or all workspaces) and downgrades them from Webex Calling to free calling.

usage: room_devices.py [-h] [--location LOCATION] [--wsnames WSNAMES] [--test] {show,clear}

CLI tool to manage room device calling entitlements

positional arguments:
  {show,clear}         show: show all room devices with their calling settings, clear:
                       remove calling license from devices

options:
  -h, --help           show this help message and exit
  --location LOCATION  work on devices in given location
  --wsnames WSNAMES    file name of a file with workspace names to operate on; one name per
                       line
  --test               test run only
(wxc-sdk-py3.11) ➜  examples git:(master) ✗ ./room_devices.py --help
usage: room_devices.py [-h] [--location LOCATION] [--wsnames WSNAMES] [--test] {show,clear}

CLI tool to manage room device calling entitlements

positional arguments:
  {show,clear}         show: show all room devices with their calling settings, clear: remove
                       calling license from devices

options:
  -h, --help           show this help message and exit
  --location LOCATION  work on devices in given location
  --wsnames WSNAMES    file name of a file with workspace names to operate on; one name per
                       line
  --test               test run only

Source: room_devices.py

  1#!/usr/bin/env python
  2"""
  3usage: room_devices.py [-h] [--location LOCATION] [--wsnames WSNAMES] [--test] {show,clear}
  4
  5CLI tool to manage room device calling entitlements
  6
  7positional arguments:
  8  {show,clear}         show: show all room devices with their calling settings, clear: remove calling
  9                       license from devices
 10
 11optional arguments:
 12  -h, --help           show this help message and exit
 13  --location LOCATION  work on devices in given location
 14  --wsnames WSNAMES    file name of a file with workspace names to operate on; one name per line
 15  --test               test run only
 16
 17The tool reads environment variables from room_devices.env:
 18    SERVICE_APP_CLIENT_ID=<clients id of a service app created on developer.webex.com>
 19    SERVICE_APP_CLIENT_SECRET=<clients secret of a service app created on developer.webex.com>
 20    SERVICE_APP_REFRESH_TOKEN=<refresh token of the service app obtained after the service app has been
 21                                    authorized for an org>
 22
 23This information is used to obtain an access token required to authorize API access
 24
 25This is a super-set of the scopes the service app needs:
 26    * spark-admin:workspaces_write
 27    * Identity:one_time_password
 28    * identity:placeonetimepassword_create
 29    * spark:people_read
 30    * spark-admin:workspace_locations_read
 31    * spark-admin:workspaces_read
 32    * spark:devices_write
 33    * spark:devices_read
 34    * spark:kms
 35    * spark-admin:devices_read
 36    * spark-admin:workspace_locations_write
 37    * spark-admin:licenses_read
 38    * spark-admin:telephony_config_read
 39    * spark-admin:telephony_config_write
 40    * spark-admin:devices_write
 41    * spark-admin:people_read
 42
 43More service app details: https://developer.webex.com/docs/service-apps
 44
 45Tokens get persisted in room_devices.yml.
 46"""
 47
 48import asyncio
 49import logging
 50import sys
 51import time
 52from argparse import ArgumentParser
 53from collections import defaultdict
 54from functools import reduce
 55from itertools import chain
 56from os import getcwd, getenv
 57from os.path import basename, isfile, join, splitext
 58from typing import Optional
 59
 60from dotenv import load_dotenv
 61from yaml import safe_dump, safe_load
 62
 63from wxc_sdk.as_api import AsWebexSimpleApi
 64from wxc_sdk.base import webex_id_to_uuid
 65from wxc_sdk.common import OwnerType
 66from wxc_sdk.devices import Device
 67from wxc_sdk.integration import Integration
 68from wxc_sdk.locations import Location
 69from wxc_sdk.telephony import NumberListPhoneNumber
 70from wxc_sdk.tokens import Tokens
 71from wxc_sdk.workspace_locations import WorkspaceLocation
 72from wxc_sdk.workspaces import CallingType, Workspace, WorkspaceCalling, WorkspaceSupportedDevices
 73
 74
 75def yml_path() -> str:
 76    """
 77    Get filename for YML file to cache access and refresh token
 78    """
 79    return f'{splitext(basename(__file__))[0]}.yml'
 80
 81
 82def env_path() -> str:
 83    """
 84    Get path to .env file to read service app settings from
 85    """
 86    return f'{splitext(basename(__file__))[0]}.env'
 87
 88
 89def read_tokens_from_file() -> Optional[Tokens]:
 90    """
 91    Get service app tokens from cache file, return None if cache does not exist or read fails
 92    """
 93    path = yml_path()
 94    if not isfile(path):
 95        return None
 96    try:
 97        with open(path) as f:
 98            data = safe_load(f)
 99        tokens = Tokens.model_validate(data)
100    except Exception:
101        return None
102    return tokens
103
104
105def write_tokens_to_file(tokens: Tokens):
106    """
107    Write tokens to cache
108    """
109    with open(yml_path(), mode='w') as f:
110        safe_dump(tokens.model_dump(exclude_none=True), f)
111
112
113def get_access_token() -> Tokens:
114    """
115    Get a new access token using refresh token, service app client id, service app client secret
116    """
117    tokens = Tokens(refresh_token=getenv('SERVICE_APP_REFRESH_TOKEN'))
118    integration = Integration(
119        client_id=getenv('SERVICE_APP_CLIENT_ID'),
120        client_secret=getenv('SERVICE_APP_CLIENT_SECRET'),
121        scopes=[],
122        redirect_url=None,
123    )
124    integration.refresh(tokens=tokens)
125    write_tokens_to_file(tokens)
126    return tokens
127
128
129def get_tokens() -> Optional[Tokens]:
130    """
131    Get tokens from cache or create new access token using service app credentials
132    """
133    # try to read from file
134    tokens = read_tokens_from_file()
135    # .. or create new access token using refresh token
136    if tokens is None:
137        tokens = get_access_token()
138    if tokens.remaining < 24 * 60 * 60:
139        tokens = get_access_token()
140    return tokens
141
142
143def main() -> int:
144    """
145    Main code
146    """
147    # parse args
148    parser = ArgumentParser(prog=basename(__file__), description='CLI tool to manage room device calling entitlements')
149    parser.add_argument(
150        'operation',
151        choices=['show', 'clear'],
152        help='show: show all room devices with their calling settings, clear: remove calling license from devices',
153    )
154    parser.add_argument('--location', type=str, help='work on devices in given location')
155    parser.add_argument(
156        '--wsnames', type=str, help='file name of a file with workspace names to operate on; one name per line'
157    )
158    parser.add_argument('--test', action='store_true', help='test run only')
159    args = parser.parse_args()
160    operation = args.operation
161    test_run = args.test
162    location = args.location
163    ws_names = args.wsnames
164
165    # get tokens; as an alternative you can just get a developer token from developer.webex.com and use:
166    #   tokens = '<developer token from developer.webex.com>'
167    load_dotenv(dotenv_path=env_path())
168    err = ''
169    tokens = None
170    try:
171        tokens = get_tokens()
172    except Exception as e:
173        err = f'{e}'
174    if not tokens:
175        print(f'failed to obtain access tokens: {err}', file=sys.stderr)
176        return 1
177
178    async def as_main() -> int:
179        """
180        Async main to be able to use concurrency
181        """
182
183        async def downgrade_workspace(ws: Workspace):
184            """
185            Downgrade one workspace to free calling
186            """
187
188            def log(s: str, file=sys.stdout):
189                print(f'downgrade workspace "{ws.display_name:{ws_name_len}}": {s}', file=file)
190
191            if ws.calling.type != CallingType.webex:
192                raise ValueError(f'calling type is "{ws.calling.type}", not "{CallingType.webex.value}"')
193            if test_run:
194                log('skipping update, test run only')
195            else:
196                log('updating calling settings')
197                update = ws.model_copy(deep=True)
198                update.calling = WorkspaceCalling(type=CallingType.free)
199                update.workspace_location_id = None
200                update.location_id = None
201                await api.workspaces.update(workspace_id=ws.workspace_id, settings=update)
202            log('done')
203
204        async with AsWebexSimpleApi(tokens=tokens) as api:
205            # get list of locations and workspace locations
206            ws_location_list, location_list = await asyncio.gather(
207                api.workspace_locations.list(display_name=location), api.locations.list(name=location)
208            )
209            location_list: list[Location]
210            ws_location_list: list[WorkspaceLocation]
211
212            # validate location argument
213            if location:
214                target_location = next((loc for loc in location_list if loc.name == location), None)
215                target_ws_location = next((loc for loc in ws_location_list if loc.display_name == location), None)
216                if not all((target_ws_location, target_location)):
217                    print(f'location "{location}" not found', file=sys.stderr)
218                    return 1
219            else:
220                target_location = None
221                target_ws_location = None
222
223            # get workspaces, numbers, and devices (in target location)
224            workspaces, numbers, devices = await asyncio.gather(
225                api.workspaces.list(workspace_location_id=target_ws_location and target_ws_location.id),
226                api.telephony.phone_numbers(
227                    location_id=target_location and target_location.location_id, owner_type=OwnerType.place
228                ),
229                api.devices.list(
230                    workspace_location_id=target_ws_location and target_ws_location.id, product_type='roomdesk'
231                ),
232            )
233            workspaces: list[Workspace]
234            numbers: list[NumberListPhoneNumber]
235            devices: list[Device]
236
237            # only workspaces supporting desk devices
238            workspaces = [
239                ws for ws in workspaces if ws.supported_devices == WorkspaceSupportedDevices.collaboration_devices
240            ]
241
242            # if a path to a file with workspace names was given, then filter based on the file contents
243            if ws_names:
244                with open(ws_names) as f:
245                    workspace_names = set(s_line for line in f if (s_line := line.strip()))
246                workspaces = [ws for ws in workspaces if ws.display_name in workspace_names]
247            if not workspaces:
248                print('No workspaces', file=sys.stderr)
249                return 1
250
251            # only devices in workspaces (no personal devices)
252            devices = [d for d in devices if d.workspace_id is not None]
253
254            # prepare some lookups
255            workspace_locations_by_id: dict[str, WorkspaceLocation] = {wsl.id: wsl for wsl in ws_location_list}
256            numbers_by_workspace_uuid: dict[str, list[NumberListPhoneNumber]] = reduce(
257                lambda r, el: r[webex_id_to_uuid(el.owner.owner_id)].append(el) or r, numbers, defaultdict(list)
258            )
259            devices_by_workspace_id: dict[str, list[Device]] = reduce(
260                lambda r, el: r[el.workspace_id].append(el) or r, devices, defaultdict(list)
261            )
262
263            # sort workspaces by workspace location name and workspace name; workspace location can be unset
264            workspaces.sort(
265                key=lambda ws: (
266                    ''
267                    if not ws.workspace_location_id
268                    else workspace_locations_by_id[ws.workspace_location_id].display_name,
269                    ws.display_name,
270                )
271            )
272            # some field lengths for nicer output
273            wsl_name_len = max(len(wsl.display_name) for wsl in ws_location_list)
274            ws_name_len = max(len(ws.display_name) for ws in workspaces)
275
276            # ... chain([1], ...) to avoid max() on empty sequence
277            pn_len = max(chain([1], (len(n.phone_number) for n in numbers if n.phone_number)))
278            ext_len = max(chain([1], (len(n.extension) for n in numbers if n.extension)))
279
280            # print workspaces with workspace locations, numbers, and devices
281            for workspace in workspaces:
282                if not workspace.workspace_location_id:
283                    wsl_name = ''
284                else:
285                    wsl_name = workspace_locations_by_id[workspace.workspace_location_id].display_name
286                print(
287                    f'workspace location "{wsl_name:{wsl_name_len}}", '
288                    f'workspace "{workspace.display_name:{ws_name_len}}"'
289                )
290
291                # are there any numbers in that workspace?
292                numbers = numbers_by_workspace_uuid.get(webex_id_to_uuid(workspace.workspace_id))
293                if numbers:
294                    for number in numbers:
295                        print(
296                            f'  number: {number.phone_number or "-" * pn_len:{pn_len}}/'
297                            f'{number.extension or "-" * ext_len:{ext_len}}'
298                        )
299                devices = devices_by_workspace_id.get(workspace.workspace_id)
300                if devices:
301                    for device in devices:
302                        print(f'  device: {device.display_name}')
303
304            if operation == 'show':
305                # we are done here
306                return 0
307
308            # now we want to downgrade (disable calling) on all workspaces
309            print()
310            print('Starting downgrade')
311            results = await asyncio.gather(*[downgrade_workspace(ws) for ws in workspaces], return_exceptions=True)
312
313            # print errors ... if any
314            for ws, result in zip(workspaces, results):
315                ws: Workspace
316                if isinstance(result, Exception):
317                    print(f'Failed to downgrade "{ws.display_name:{ws_name_len}}": {result}', file=sys.stderr)
318
319            if any(isinstance(r, Exception) for r in results):
320                return 1
321            return 0
322
323    return asyncio.run(as_main())
324
325
326if __name__ == '__main__':
327    root_logger = logging.getLogger()
328    h = logging.StreamHandler(stream=sys.stderr)
329    h.setLevel(logging.INFO)
330    root_logger.setLevel(logging.INFO)
331    root_logger.addHandler(h)
332
333    # log REST API interactions to file
334    file_fmt = logging.Formatter(fmt='%(asctime)s %(levelname)s %(message)s')
335    file_fmt.converter = time.gmtime
336
337    rest_log_name = join(getcwd(), f'{splitext(basename(__file__))[0]}.log')
338    rest_log_handler = logging.FileHandler(rest_log_name, mode='w')
339    rest_log_handler.setLevel(logging.DEBUG)
340    rest_log_handler.setFormatter(file_fmt)
341    rest_logger = logging.getLogger('wxc_sdk.as_rest')
342    rest_logger.setLevel(logging.DEBUG)
343    rest_logger.addHandler(rest_log_handler)
344
345    exit(main())

Logout users by selectively revoking authorizations

Selectively revoke authorizations.

usage: logout_users.py [-h] [--appname APPNAME] [--test] email

CLI tool to logout users by revoking user authorizations

positional arguments:
  email              single email or path to file w/ email addresses (one email address per
                     line). "all" can be used to print authorizations for all users. "all"
                     cannot be combined with other parameters and no authorizations will be
                     revoked in this case."

options:
  -h, --help         show this help message and exit
  --appname APPNAME  regular expression matching authorization application names. When missing
                     authorizations for all client ids defined in the script are revoked
  --test             test run only

Source: logout_users.py

  1#!/usr/bin/env python
  2"""
  3    usage: logout_users.py [-h] [--appname APPNAME] [--test] email
  4
  5    CLI tool to logout users by revoking user authorizations
  6
  7    positional arguments:
  8      email              single email or path to file w/ email addresses (one email address per line). "all" can be
  9                         used to print authorizations for all users. "all" cannot be combined with other parameters
 10                         and no authorizations will be revoked in this case."
 11
 12    optional arguments:
 13      -h, --help         show this help message and exit
 14      --appname APPNAME  regular expression matching authorization application names. When missing authorizations for
 15                         all client ids defined in the script are revoked
 16      --test             test run only
 17
 18
 19    The script uses a service app to access the authorization APIs:
 20    https://developer.webex.com/admin/docs/api/v1/authorizations
 21
 22    The service app needs these scopes: identity:tokens_write identity:tokens_read spark-admin:people_read
 23
 24    Service app details are read from "logout_users.env":
 25
 26        SERVICE_APP_CLIENT_ID=<client id>>
 27        SERVICE_APP_CLIENT_SECRET=<client secret>>
 28        SERVICE_APP_REFRESH_TOKEN=<service app refresh token>>
 29"""
 30import asyncio
 31import logging
 32import re
 33import sys
 34import time
 35from argparse import ArgumentParser
 36from itertools import chain
 37from operator import attrgetter
 38from os import getcwd, getenv
 39from os.path import join, splitext, basename, isfile
 40from typing import Optional
 41
 42from dotenv import load_dotenv
 43from yaml import safe_dump, safe_load
 44
 45from wxc_sdk.as_api import AsWebexSimpleApi
 46from wxc_sdk.authorizations import Authorization, AuthorizationType
 47from wxc_sdk.integration import Integration
 48from wxc_sdk.people import Person
 49from wxc_sdk.tokens import Tokens
 50
 51# list of client ids to revoke authorizations for
 52# add more client ids as needed
 53CLIENT_IDS = {
 54    # Webex Web Client
 55    'C64ab04639eefee4798f58e7bc3fe01d47161be0d97ff0d31e040a6ffe66d7f0a',
 56    # Webex Teams Desktop Client for Mac
 57    'Ccb2581f071a0714c8ab7d4777f70ebed26f1ef5f3261597f00afbb3a53c1ad88',
 58    # add more client ids as required, call the script with "all" parameter to identify more client ids
 59}
 60
 61
 62def yml_path() -> str:
 63    """
 64    Get filename for YML file to cache access and refresh token
 65    """
 66    return f'{splitext(basename(__file__))[0]}.yml'
 67
 68
 69def env_path() -> str:
 70    """
 71    Get path to .env file to read service app settings from
 72    """
 73    return f'{splitext(basename(__file__))[0]}.env'
 74
 75
 76def read_tokens_from_file() -> Optional[Tokens]:
 77    """
 78    Get service app tokens from cache file, return None if cache does not exist or read fails
 79    """
 80    path = yml_path()
 81    if not isfile(path):
 82        return None
 83    try:
 84        with open(path, mode='r') as f:
 85            data = safe_load(f)
 86        tokens = Tokens.model_validate(data)
 87    except Exception:
 88        return None
 89    return tokens
 90
 91
 92def write_tokens_to_file(tokens: Tokens):
 93    """
 94    Write tokens to cache
 95    """
 96    with open(yml_path(), mode='w') as f:
 97        safe_dump(tokens.model_dump(exclude_none=True), f)
 98
 99
100def get_access_token() -> Tokens:
101    """
102    Get a new access token using refresh token, service app client id, service app client secret
103    """
104    tokens = Tokens(refresh_token=getenv('SERVICE_APP_REFRESH_TOKEN'))
105    integration = Integration(client_id=getenv('SERVICE_APP_CLIENT_ID'),
106                              client_secret=getenv('SERVICE_APP_CLIENT_SECRET'),
107                              scopes=[], redirect_url=None)
108    integration.refresh(tokens=tokens)
109    write_tokens_to_file(tokens)
110    return tokens
111
112
113def get_tokens() -> Optional[Tokens]:
114    """
115    Get tokens from cache or create new access token using service app credentials
116    """
117    # try to read from file
118    tokens = read_tokens_from_file()
119    # .. or create new access token using refresh token
120    if tokens is None:
121        tokens = get_access_token()
122    if tokens.remaining < 24 * 60 * 60:
123        tokens = get_access_token()
124    return tokens
125
126
127def auth_str(auth: Authorization) -> str:
128    return f'{auth.type:7}, {auth.client_id} - {auth.application_name}'
129
130
131async def main() -> int:
132    parser = ArgumentParser(prog=basename(__file__),
133                            description='CLI tool to logout users by revoking user authorizations')
134    parser.add_argument('email',
135                        help='single email or path to file w/ email addresses (one email address per line). "all" can '
136                             'be used to print authorizations for all users. "all" cannot be combined with other '
137                             'parameters and no authorizations will be revoked in this case."')
138    parser.add_argument('--appname',
139                        type=str,
140                        help='regular expression matching authorization application names. '
141                             'When missing authorizations for all client ids defined in the '
142                             'script are revoked')
143    parser.add_argument('--test', action='store_true', help='test run only')
144    args = parser.parse_args()
145    email = args.email
146    test_run = args.test
147    appname = args.appname
148
149    if appname:
150        try:
151            appname_re = re.compile(appname)
152        except re.error as e:
153            print(f'invalid regular expression for --appname: {e}')
154            return 1
155
156    async def work_on_one_email(api: AsWebexSimpleApi, user_email: str) -> int:
157        """
158        Work on authorizations for one email
159        """
160        print(f'Getting authorizations for {user_email}')
161        auths = await api.authorizations.list(person_email=user_email)
162
163        auths.sort(key=lambda a: f'{a.application_name}{a.type}')
164
165        # determine set of authorization ids to revoke
166        if appname:
167            auths_to_delete = set(a.id for a in auths
168                                  if appname_re.match(a.application_name))
169        else:
170            auths_to_delete = set(a.id for a in auths
171                                  if a.client_id in CLIENT_IDS)
172        if auths:
173            # show all authorizations and indicate which will be revoked
174            print('\n'.join(f'{user_email}: {auth_str(auth)} '
175                            f'{"--> revoke" if auth.id in auths_to_delete else ""}'
176                            for auth in auths))
177
178        if not auths:
179            print(f'{user_email}: no auths found')
180            return 0
181        if not auths_to_delete:
182            print(f'{user_email}: no auths to revoke')
183            return 0
184        if test_run:
185            print(f'{user_email}: testrun, not revoking any auths')
186            return 0
187        results = await asyncio.gather(*[api.authorizations.delete(authorization_id=a_id)
188                                         for a_id in auths_to_delete],
189                                       return_exceptions=True)
190        err = False
191        for r, auth_id in zip(results, auths_to_delete):
192            if not isinstance(r, Exception):
193                continue
194            auth = next((a for a in auths if a.id == auth_id))
195            if auth.type == AuthorizationType.refresh:
196                # ignore errors on revoking access tokens (race condition)
197                continue
198            print(f'{user_email}: {auth_str(auth)}, error revoking auth: {r}')
199            err = True
200        if err:
201            return 1
202        return 0
203
204    # get tokens; as an alternative you can just get a developer token from developer.webex.com and use:
205    #   tokens = '<developer token from developer.webex.com>'
206    load_dotenv(dotenv_path=env_path())
207    err = ''
208    tokens = None
209    try:
210        tokens = get_tokens()
211    except Exception as e:
212        err = f'{e}'
213    if not tokens:
214        print(f'failed to obtain access tokens: {err}', file=sys.stderr)
215        return 1
216
217    async with AsWebexSimpleApi(tokens=tokens) as api:
218        if email == 'all':
219            print('Getting users...')
220            users = await api.people.list()
221            print(f'Getting authorizations for {len(users)} users...')
222            auth_lists = await asyncio.gather(*[api.authorizations.list(person_id=user.person_id)
223                                                for user in users],
224                                              return_exceptions=True)
225            err = False
226            for user, error in zip(users, auth_lists):
227                user: Person
228                if isinstance(error, Exception):
229                    print(f'{user.emails}: failed to get authorizations: {error}')
230                    err = True
231            if err:
232                return 1
233            auth_dict: dict[str, list[Authorization]] = {person.person_id: auth_list
234                                                         for person, auth_list in zip(users, auth_lists)
235                                                         if auth_list}
236            print('\n'.join(chain.from_iterable((f'{user.emails[0]}: {auth_str(auth)}'
237                                                 for auth in auth_dict.get(user.person_id, []))
238                                                for user in sorted(users, key=attrgetter('display_name')))))
239
240            clients = set(auth_str(auth) for auth in chain.from_iterable(auth_lists))
241            print()
242            print('tokens and clients:')
243            print('\n'.join(sorted(clients)))
244            return 0
245        elif isfile(email):
246            # read email addresses from file, one email address per line
247            with open(email, mode='r') as f:
248                emails = list(sorted(set(s_line for line in f if (s_line := line.strip()))))
249        else:
250            emails = [email]
251
252        if not emails:
253            print('Nothing to do')
254            return 0
255
256        # act on all emails concurrently
257        results = await asyncio.gather(*[work_on_one_email(api, e)
258                                         for e in emails],
259                                       return_exceptions=True)
260    err = next((r for r in results if isinstance(r, Exception)), None)
261    if err:
262        raise err
263    return max(results)
264
265
266if __name__ == '__main__':
267    root_logger = logging.getLogger()
268    h = logging.StreamHandler(stream=sys.stderr)
269    h.setLevel(logging.DEBUG)
270    root_logger.setLevel(logging.DEBUG)
271    root_logger.addHandler(h)
272
273    # log REST API interactions to file
274    file_fmt = logging.Formatter(fmt='%(asctime)s %(levelname)s %(message)s')
275    file_fmt.converter = time.gmtime
276
277    rest_log_name = join(getcwd(), f'{splitext(basename(__file__))[0]}.log')
278    rest_log_handler = logging.FileHandler(rest_log_name, mode='w')
279    rest_log_handler.setLevel(logging.DEBUG)
280    rest_log_handler.setFormatter(file_fmt)
281    rest_logger = logging.getLogger('wxc_sdk.as_rest')
282    rest_logger.setLevel(logging.DEBUG)
283    rest_logger.addHandler(rest_log_handler)
284
285    exit(asyncio.run(main()))

Leave spaces with no activity in the last n days

Leave spaces w/o recent activity.

usage: leave_spaces.py [-h] [--days DAYS] [--token TOKEN] [--no_test] [--no_messages]
                       [--keep KEEP]

leave spaces with no activity

options:
  -h, --help            show this help message and exit
  --days DAYS, -d DAYS  days since last activity; default: 1095
  --token TOKEN         Personal access token to use. If not provided script will try to read
                        token from WEBEX_ACCESS_TOKEN environment variable.
  --no_test             Don't test; actually leave the spaces
  --no_messages         Only leave spaces that have no messages
  --keep KEEP, -k KEEP  file with list of spaces to keep

Source: leave_spaces.py

  1#!/usr/bin/env python
  2import asyncio
  3import logging
  4import os
  5import sys
  6from argparse import ArgumentParser
  7from collections.abc import Iterable
  8from datetime import datetime, timedelta
  9from typing import Optional
 10
 11from dateutil import tz
 12from dotenv import load_dotenv
 13
 14from wxc_sdk.as_api import AsWebexSimpleApi
 15from wxc_sdk.as_rest import AsRestError
 16from wxc_sdk.common import RoomType
 17from wxc_sdk.messages import Message
 18from wxc_sdk.rooms import Room
 19from wxc_sdk.teams import Team
 20
 21
 22async def last_n_messages(api: AsWebexSimpleApi, space: Room, n: int = 10) -> list[Message]:
 23    """
 24    Get last n messages in a space
 25    """
 26    messages = []
 27    if not n:
 28        return messages
 29    async for message in api.messages.list_gen(room_id=space.id, max=min(n, 1000)):
 30        messages.append(message)
 31        n -= 1
 32        if not n:
 33            break
 34    return messages
 35
 36
 37async def latest_message_in_space(api: AsWebexSimpleApi, space: Room) -> Optional[Message]:
 38    """
 39    Get latest message in a space
 40    """
 41    last_messages = await last_n_messages(api, space, 1)
 42    if not last_messages:
 43        return None
 44    return last_messages[0]
 45
 46
 47async def verify_leave_space(api: AsWebexSimpleApi, space: Room, cutoff: datetime) -> bool:
 48    """
 49    Get latest message in a space and check if it's older than cutoff
 50    """
 51    latest = await latest_message_in_space(api, space)
 52    if not latest:
 53        return True
 54    return latest.created <= cutoff
 55
 56
 57async def leave_spaces(api: AsWebexSimpleApi, spaces: Iterable[Room]):
 58    """
 59    Leave some spaces
 60    """
 61    me = await api.people.me()
 62    person_id = me.person_id
 63
 64    async def leave_space(space: Room):
 65        # get membership to delete
 66        try:
 67            memberships = await api.membership.list(room_id=space.id, person_id=person_id)
 68            if not memberships:
 69                print(f'No membership in space "{space.title}", skipping', file=sys.stderr)
 70                return
 71            # delete membership
 72            print(f'Leaving space "{space.title}"...')
 73            await api.membership.delete(memberships[0].id)
 74            print(f'Left space "{space.title}"')
 75        except AsRestError as e:
 76            print(f'Error leaving space "{space.title}": {e}', file=sys.stderr)
 77        return
 78
 79    await asyncio.gather(*[leave_space(space) for space in spaces])
 80
 81
 82async def as_main():
 83    # parse args
 84    parser = ArgumentParser(description='leave spaces with no activity')
 85    parser.add_argument(
 86        '--days', '-d', type=int, required=False, default=3 * 365, help=f'days since last activity; default: {3 * 365}'
 87    )
 88    parser.add_argument(
 89        '--token',
 90        type=str,
 91        required=False,
 92        help='Personal access token to use. If not provided script will try to read token from '
 93        'WEBEX_ACCESS_TOKEN environment variable.',
 94    )
 95    parser.add_argument('--no_test', action='store_true', required=False, help="Don't test; actually leave the spaces")
 96    parser.add_argument(
 97        '--no_messages', action='store_true', required=False, help='Only leave spaces that have no messages'
 98    )
 99    parser.add_argument('--keep', '-k', type=str, required=False, help='file with list of spaces to keep')
100    args = parser.parse_args()
101
102    load_dotenv(override=True)
103    token = args.token or os.getenv('WEBEX_ACCESS_TOKEN')
104    if not token:
105        print('No token provided and WEBEX_ACCESS_TOKEN not set in environment', file=sys.stderr)
106        exit(1)
107    cutoff = datetime.now(tz=tz.UTC) - timedelta(days=args.days)
108    if args.keep:
109        try:
110            with open(args.keep) as f:
111                keep = set(l.strip() for l in f if l)
112        except FileNotFoundError:
113            print(f'file "{args.keep}" not found', file=sys.stderr)
114            exit(1)
115    else:
116        keep = set()
117    async with AsWebexSimpleApi(tokens=token, concurrent_requests=100) as api:
118        # check token
119        try:
120            await api.people.me()
121        except AsRestError as e:
122            if e.status == 401:
123                print(f'Token seems to be invalid: {e}', file=sys.stderr)
124                exit(1)
125            raise
126
127        print('Listing spaces and teams...')
128        spaces, teams_list = await asyncio.gather(api.rooms.list(max=1000), api.teams.list())
129        # we can't leave direct spaces anyway; so ignore them from the start
130        spaces = [space for space in spaces if space.type != RoomType.direct]
131        print(f'Found {len(spaces)} spaces')
132        print(f'Found {len(teams_list)} teams')
133        teams = {team.id: team for team in teams_list}
134
135        # identify spaces to leave based on last activity
136        leave = [space for space in spaces if space.last_activity and space.last_activity <= cutoff]
137
138        # sometimes the last_activity information seems to be "off" try to get the latest message for each space to
139        # verify latest activity
140        print(f'Getting latest message for {len(leave)} spaces...')
141        latest_messages = await asyncio.gather(*[latest_message_in_space(api, space) for space in leave])
142        validated_leave: list[tuple[Room, Optional[datetime]]] = []
143        for space, latest_message in zip(leave, latest_messages):
144            space: Room
145            latest_message: Message
146            # don't leave general spaces of Teams
147            # A general space is a space that has the same name as the Team it belongs to
148            team: Team
149            if space.team_id and (team := teams.get(space.team_id, None)) and space.title == team.name:
150                print(f'Don\'t leave general space "{space.title}" of Team "{team.name}"')
151                continue
152
153            # don't leave spaces in keep file
154            if space.title.strip() in keep:
155                print(f'Don\'t leave space "{space.title}", found space name in keep file')
156                continue
157
158            # if only spaces with no messages should be considered and we have a message in the space then don't leave
159            if args.no_messages and latest_message is not None:
160                print(
161                    f'Space "{space.title}" has messages, latest is {latest_message.created:%Y.%m.%d %H:%M:%S}, '
162                    f'last activity is {space.last_activity:%Y.%m.%d %H:%M:%S} - not leaving'
163                )
164                continue
165
166            # if no latest message or latest message is older than cutoff, consider leaving
167            if latest_message is None or latest_message.created <= cutoff:
168                latest_messages: Message
169                validated_leave.append((space, latest_message and latest_message.created))
170                continue
171            print(
172                f'Latest message in "{space.title}" is {latest_message.created:%Y.%m.%d %H:%M:%S}, '
173                f'last activity is {space.last_activity:%Y.%m.%d %H:%M:%S} - not leaving'
174            )
175        print()
176        print(f'Found {len(validated_leave)} spaces to leave:')
177
178        # sort spaces by latest activity or latest message
179        validated_leave.sort(key=lambda x: max(x[0].last_activity, x[1]) if x[1] else x[0].last_activity, reverse=False)
180
181        for space, latest in validated_leave:
182            print(f'Leave "{space.title}"')
183            print(
184                f'   last activity {space.last_activity:%Y.%m.%d %H:%M:%S}, latest message '
185                f'{f"{latest:%Y.%m.%d %H:%M:%S}" if latest else "none"}'
186            )
187
188        if args.no_test:
189            # actually try to leave the spaces
190            await leave_spaces(api, (space for space, _ in validated_leave))
191
192
193if __name__ == '__main__':
194    logging.basicConfig(level=logging.INFO)
195    asyncio.run(as_main())

Provision location level access codes from a CSV file

Provision location level access codes from a CSV file

 Usage: access_codes.py [OPTIONS] CSV_FILE

 Provision location level access codes from a CSV file

╭─ Arguments ────────────────────────────────────────────────────────────────────────────────╮
│ *    csv_file      PATH  CSV file with access codes [required]                             │
╰────────────────────────────────────────────────────────────────────────────────────────────╯
╭─ Options ──────────────────────────────────────────────────────────────────────────────────╮
│ --dry-run                         Do not make any changes                                  │
│ --token                     TEXT  Access token can be provided using --token argument, set │
│                                   in WEBEX_ACCESS_TOKEN environment variable or can be a   │
│                                   service app token. For the latter set environment        │
│                                   variables ('SERVICE_APP_REFRESH_TOKEN',                  │
│                                   'SERVICE_APP_CLIENT_ID', 'SERVICE_APP_CLIENT_SECRET').   │
│                                   Environment variables can also be set in                 │
│                                   service_app.env                                          │
│ --install-completion              Install completion for the current shell.                │
│ --show-completion                 Show completion for the current shell, to copy it or     │
│                                   customize the installation.                              │
│ --help                            Show this message and exit.                              │
╰────────────────────────────────────────────────────────────────────────────────────────────╯

Source: access_codes.py

  1#!/usr/bin/env -S uv run --script
  2# /// script
  3# requires-python = ">=3.11,<3.14"
  4# dependencies = [
  5#     "typer",
  6#     "wxc-sdk",
  7#     "python-dotenv",
  8# ]
  9# ///
 10"""
 11Provision location level access codes from a CSV file
 12the CSV file should have the following columns:
 13- location: location name
 14- code: the access code
 15- description: a description of the access code
 16- operation: one of 'add', 'delete'
 17
 18Usage: access_codes.py [OPTIONS] CSV_FILE
 19
 20 Provision location level access codes from a CSV file
 21
 22╭─ Arguments ────────────────────────────────────────────────────────────────────────────────╮
 23│ *    csv_file      PATH  CSV file with access codes [required]                             │
 24╰────────────────────────────────────────────────────────────────────────────────────────────╯
 25╭─ Options ──────────────────────────────────────────────────────────────────────────────────╮
 26│ --dry-run                         Do not make any changes                                  │
 27│ --token                     TEXT  Access token can be provided using --token argument, set │
 28│                                   in WEBEX_ACCESS_TOKEN environment variable or can be a   │
 29│                                   service app token. For the latter set environment        │
 30│                                   variables ('SERVICE_APP_REFRESH_TOKEN',                  │
 31│                                   'SERVICE_APP_CLIENT_ID', 'SERVICE_APP_CLIENT_SECRET').   │
 32│                                   Environment variables can also be set in                 │
 33│                                   access_codes.env                                         │
 34│ --install-completion              Install completion for the current shell.                │
 35│ --show-completion                 Show completion for the current shell, to copy it or     │
 36│                                   customize the installation.                              │
 37│ --help                            Show this message and exit.                              │
 38╰────────────────────────────────────────────────────────────────────────────────────────────╯
 39
 40"""
 41import asyncio
 42import csv
 43import logging
 44import sys
 45from collections import defaultdict
 46from functools import reduce
 47from os.path import basename
 48from pathlib import Path
 49from typing import Literal, Optional
 50
 51import typer
 52from dotenv import load_dotenv
 53from pydantic import BaseModel, TypeAdapter
 54from service_app import SERVICE_APP_ENVS, env_path, get_tokens
 55
 56from wxc_sdk.as_api import AsWebexSimpleApi
 57from wxc_sdk.common import AuthCode
 58from wxc_sdk.telephony.location import TelephonyLocation
 59from wxc_sdk.tokens import Tokens
 60
 61
 62class CSVOperation(BaseModel):
 63    """
 64    row in CSV file: Operation to be performed on an access code
 65    """
 66    location: str
 67    code: str
 68    description: str
 69    operation: Literal['add', 'delete']
 70
 71
 72async def process_operations(*, operations: list[CSVOperation], token: str, dry_run:bool):
 73    """
 74    Process the operations from the CSV file in parallel
 75    """
 76
 77    # noinspection PyShadowingNames
 78    async def process_one_location(*, location: str, operations: list[CSVOperation]):
 79        """
 80        process operations for one location
 81        """
 82        # find the location
 83        loc = locations.get(location)
 84        if loc is None:
 85            print(f'Location {location} not found', sys.stderr)
 86            return
 87
 88        # get existing codes in location
 89        ac_codes: dict[str, AuthCode] = {ac.code: ac
 90                                         for ac in await api.telephony.access_codes.read(location_id=loc.location_id)}
 91
 92        tasks = []
 93        delete_operations = [operation
 94                             for operation in operations
 95                             if operation.operation == 'delete']
 96        non_existing = [op
 97                        for op in delete_operations
 98                        if op.code not in ac_codes]
 99        if non_existing:
100            print(f'Location {location}: access codes not found: {", ".join(op.code for op in non_existing)}',
101                  file=sys.stderr)
102        to_delete = [ac
103                     for ac in delete_operations
104                     if ac.code in ac_codes]
105        print(f'Location {location}: deleting access codes: {", ".join(op.code for op in to_delete)}')
106        if to_delete:
107            tasks.append(api.telephony.access_codes.delete_codes(location_id=loc.location_id,
108                                                                 access_codes=[op.code for op in to_delete]))
109
110        add_operations = [operation
111                          for operation in operations
112                          if operation.operation == 'add']
113        existing = [op for op in add_operations if op.code in ac_codes]
114        if existing:
115            print(f'Location {location}: access codes already exist: {", ".join(op.code for op in existing)}',
116                  file=sys.stderr)
117        to_add = [ac for ac in add_operations if ac.code not in ac_codes]
118        print(f'Location {location}: adding access codes: {", ".join(op.code for op in to_add)}')
119        if to_add:
120            tasks.append(api.telephony.access_codes.create(location_id=loc.location_id,
121                                                           access_codes=[AuthCode(code=op.code,
122                                                                                  description=op.description)
123                                                                         for op in to_add]))
124        if tasks and not dry_run:
125            await asyncio.gather(*tasks)
126
127    async with AsWebexSimpleApi(tokens=token) as api:
128        # get locations
129        locations: dict[str, TelephonyLocation] = {loc.name: loc for loc in await api.telephony.locations.list()}
130
131        # group operations by location
132        operations_by_location: dict[str, list[CSVOperation]] = reduce(
133            lambda acc, op: acc[op.location].append(op) or acc,
134            operations,
135            defaultdict(list))
136
137        # process all locations in parallel
138        await asyncio.gather(*[process_one_location(location=loc, operations=ops)
139                               for loc, ops in operations_by_location.items()])
140    return
141
142
143app = typer.Typer()
144
145
146@app.command(name=basename(__file__),
147             help='Provision location level access codes from a CSV file')
148def main(csv_file: Path = typer.Argument(exists=True,
149                                         help='CSV file with access codes'),
150         dry_run: bool = typer.Option(False, '--dry-run',
151                                      help='Do not make any changes'),
152
153         token: Optional[str] = typer.Option(None, '--token',
154                                             help=f'Access token can be provided using --token argument, '
155                                                  f'set in WEBEX_ACCESS_TOKEN environment variable or '
156                                                  f'can be a service app token. For the latter set '
157                                                  f'environment variables {SERVICE_APP_ENVS}. '
158                                                  f'Environment variables can also be set in '
159                                                  f'{env_path()}')):
160    # get access token
161    load_dotenv(env_path(), override=True)
162    tokens = get_tokens() if token is None else Tokens(access_token=token)
163    if tokens is None:
164        print(f'Access token can be provided using --token argument, set in WEBEX_ACCESS_TOKEN environment variable or '
165              f'can be a service app token. For the latter set environment variables {SERVICE_APP_ENVS}. Environment '
166              f'variables can '
167              f'also be set in {env_path()}', file=sys.stderr)
168        exit(1)
169
170    csv_file = str(csv_file)
171    # read CSV file
172    with open(csv_file) as f:
173        reader = csv.DictReader(f)
174        data = list(reader)
175
176    # validate the data from the CSV file
177    csv_operations = TypeAdapter(list[CSVOperation]).validate_python(data)
178
179    asyncio.run(process_operations(operations=csv_operations,
180                                   token=tokens,
181                                   dry_run=dry_run))
182
183
184if __name__ == '__main__':
185    logging.basicConfig(level=logging.DEBUG)
186    app()

Bulk assign/unassign agents to/from call queues

Bulk assign/unassign agents to/from call queues

usage: queue_agents.py [-h] (--add | --remove) --queues QUEUES --agent AGENT [--token TOKEN]
                       [--debug] [--har] [--dry-run]

Bulk manage agents in call queues

options:
  -h, --help       show this help message and exit
  --add            Add agent(s) to specified queues
  --remove         Remove agent(s) from specified queues
  --queues QUEUES  Text file with list of call queue names (one per line). Each line should
                   be the a location name and a queue name separated by a colon. Example:
                   "Location1:Queue1"
  --agent AGENT    Single agent email address or text file with agent email addresses (one
                   per line)
  --token TOKEN    admin access token to use. If no token is given then the script will try
                   to use service app tokens. The service app parameters are read from
                   environment variables SERVICE_APP_ID, SERvICE_APP_SECRET, and
                   SERVICE_APP_REFRESH. These parameters can also be defined in
                   "queue_agents.env" file. Service app tokens are cached in
                   "queue_agents.yml". If no access token is passed and no service app is
                   defined then the script falls back to try to read an access token from
                   environment variable WEBEX_ACCESS_TOKEN.
  --debug          Enable debug output
  --har            Enable HAR output
  --dry-run        Simulate the operation without making actual changes

Example: queue_agents.py --queues queues.txt --agent agents.txt --add

Source: queue_agents.py

  1#!/usr/bin/env python3
  2"""
  3Bulk manage agents in call queues
  4
  5usage: queue_agents.py [-h] (--add | --remove) --queues QUEUES --agent AGENT [--token TOKEN]
  6
  7Bulk manage agents in call queues
  8
  9options:
 10  -h, --help       show this help message and exit
 11  --add            Add agent(s) to specified queues
 12  --remove         Remove agent(s) from specified queues
 13  --queues QUEUES  Text file with list of call queue names (one per line). Each line should be the a
 14                   location name and a queue name separated by a colon. Example: "Location1:Queue1"
 15  --agent AGENT    Single agent email address or text file with agent email addresses (one per line)
 16  --token TOKEN    admin access token to use. If no token is given then the script will try to use
 17                   service app tokens. The service app parameters are read from environment variables
 18                   SERVICE_APP_ID, SERvICE_APP_SECRET, and SERVICE_APP_REFRESH. These parameters can
 19                   also be defined in "queue_agents.env" file. Service app tokens are cached in
 20                   "queue_agents.yml". If no access token is passed and no service app is defined then
 21                   the script falls back to try to read an access token from environment variable
 22                   WEBEX_ACCESS_TOKEN.
 23
 24Example: queue_agents.py --queues queues.txt --agent agents.txt --add
 25"""
 26
 27import argparse
 28import asyncio
 29import logging
 30import os
 31import sys
 32from collections.abc import Iterable
 33from contextlib import contextmanager
 34from typing import List, Optional
 35
 36import yaml
 37from dotenv import load_dotenv
 38
 39from wxc_sdk import Tokens
 40from wxc_sdk.as_api import AsWebexSimpleApi
 41from wxc_sdk.har_writer import HarWriter
 42from wxc_sdk.integration import Integration
 43from wxc_sdk.people import Person
 44from wxc_sdk.telephony.callqueue import CallQueue
 45from wxc_sdk.telephony.hg_and_cq import Agent
 46
 47
 48def yml_path() -> str:
 49    """
 50    Get filename for YML file to cache access and refresh token
 51    """
 52    return f'{os.path.splitext(os.path.basename(__file__))[0]}.yml'
 53
 54
 55def env_path() -> str:
 56    """
 57    Get path to .env file to read service app settings from
 58    :return:
 59    """
 60    return f'{os.path.splitext(os.path.basename(__file__))[0]}.env'
 61
 62
 63def read_tokens_from_file() -> Optional[Tokens]:
 64    """
 65    Get service app tokens from cache file, return None if cache does not exist
 66    """
 67    path = yml_path()
 68    if not os.path.isfile(path):
 69        return None
 70    try:
 71        with open(path) as f:
 72            data = yaml.safe_load(f)
 73        tokens = Tokens.model_validate(data)
 74    except Exception:
 75        return None
 76    return tokens
 77
 78
 79def write_tokens_to_file(tokens: Tokens):
 80    """
 81    Write tokens to cache
 82    """
 83    with open(yml_path(), mode='w') as f:
 84        yaml.safe_dump(tokens.model_dump(exclude_none=True), f)
 85
 86
 87def get_access_token() -> Optional[Tokens]:
 88    """
 89    Get a new access token using refresh token, service app client id, service app client secret
 90    """
 91    env_vars = ('SERVICE_APP_ID', 'SERVICE_APP_SECRET', 'SERVICE_APP_REFRESH')
 92    app_id, app_secret, app_refresh = (os.getenv(var) for var in env_vars)
 93    if not all((app_id, app_secret, app_refresh)):
 94        return None
 95    tokens = Tokens(refresh_token=app_refresh)
 96    integration = Integration(client_id=app_id, client_secret=app_secret, scopes=[], redirect_url=None)
 97    integration.refresh(tokens=tokens)
 98    write_tokens_to_file(tokens)
 99    return tokens
100
101
102def get_tokens() -> Optional[Tokens]:
103    """
104    Get tokens from cache or create new access token using service app credentials
105    """
106    # try to read from file
107    tokens = read_tokens_from_file()
108    # .. or create new access token using refresh token
109    if tokens is None:
110        tokens = get_access_token()
111        if tokens is None:
112            return None
113    if tokens.remaining < 24 * 60 * 60:
114        tokens = get_access_token()
115    return tokens
116
117
118def read_file_lines(filename: str) -> List[str]:
119    """
120    Read lines from a file, stripping whitespace and removing empty lines.
121
122    Args:
123        filename (str): Path to the input file
124
125    Returns:
126        List[str]: Cleaned list of lines from the file
127
128    Raises:
129        FileNotFoundError: If the specified file does not exist
130        PermissionError: If there are permission issues reading the file
131    """
132    try:
133        with open(filename) as f:
134            return [line.strip() for line in f if line.strip()]
135    except FileNotFoundError:
136        print(f"Error: File '{filename}' not found.", file=sys.stderr)
137        sys.exit(1)
138    except PermissionError:
139        print(f"Error: Permission denied reading file '{filename}'.", file=sys.stderr)
140        sys.exit(1)
141
142
143async def process_one_queue(
144    *, api: AsWebexSimpleApi, queue: CallQueue, agents: list[Person], action: str, dry_run: bool = True
145):
146    """
147    Process adding or removing agents from a single call queue.
148    """
149    # get agents
150    details = await api.telephony.callqueue.details(location_id=queue.location_id, queue_id=queue.id)
151
152    agent: Agent
153    agents_in_queue = set(agent.agent_id for agent in details.agents)
154    agent_ids = set(person.person_id for person in agents)
155    if action == 'add':
156        agents_to_add = agent_ids - agents_in_queue
157        if not agents_to_add:
158            print(f'All agents are already in queue {queue.name} in location {queue.location_name}. Skipping.')
159            return
160        agent_str = ', '.join(
161            sorted(
162                f'{person.display_name}({person.emails[0]})' for person in agents if person.person_id in agents_to_add
163            )
164        )
165        print(f'Adding agents to queue {queue.name} in location {queue.location_name}: {agent_str}')
166        details.agents.extend([Agent(agent_id=agent_id) for agent_id in agents_to_add])
167    else:
168        agents_to_remove = agents_in_queue & agent_ids
169        if not agents_to_remove:
170            print(f'No agents to remove from queue {queue.name} in location {queue.location_name}. Skipping.')
171            return
172        agent_str = ', '.join(
173            sorted(
174                f'{person.display_name}({person.emails[0]})'
175                for person in agents
176                if person.person_id in agents_to_remove
177            )
178        )
179        print(f'Removing agents from queue {queue.name} in location {queue.location_name}: {agent_str}')
180        details.agents = [agent for agent in details.agents if agent.agent_id not in agents_to_remove]
181    update = CallQueue(agents=details.agents)
182    if not dry_run:
183        await api.telephony.callqueue.update(location_id=queue.location_id, queue_id=queue.id, update=update)
184
185
186async def validate_queues(api: AsWebexSimpleApi, queues: Iterable[str]) -> list[CallQueue]:
187    """
188    Validate queue names and return a list of CallQueue objects
189    """
190    # validate queue names
191    existing_queues = {f'{queue.location_name}:{queue.name}': queue for queue in await api.telephony.callqueue.list()}
192    validated_queues = []
193    for queue in queues:
194        if queue not in existing_queues:
195            print(f"Queue '{queue}' does not exist. Skipping.", file=sys.stderr)
196        else:
197            validated_queues.append(existing_queues[queue])
198
199    return validated_queues
200
201
202async def validate_users(api: AsWebexSimpleApi, users: Iterable[str]) -> list[Person]:
203    """
204    Validate user emails and return a list of Person objects
205    """
206    existing_users = {user.emails[0].lower(): user for user in await api.people.list()}
207    validated_users = []
208    for user in users:
209        if user.lower() not in existing_users:
210            print(f"User '{user}' does not exist. Skipping.", file=sys.stderr)
211        else:
212            validated_users.append(existing_users[user.lower()])
213
214    return validated_users
215
216
217def main():
218    """
219    Main CLI script entry point for managing call queue agents.
220    """
221    parser = argparse.ArgumentParser(
222        description='Bulk manage agents in call queues',
223        epilog='Example: %(prog)s --queues queues.txt --agent agents.txt --add',
224    )
225
226    # Mutually exclusive group for add/remove actions
227    action_group = parser.add_mutually_exclusive_group(required=True)
228    action_group.add_argument(
229        '--add', action='store_const', const='add', dest='action', help='Add agent(s) to specified queues'
230    )
231    action_group.add_argument(
232        '--remove', action='store_const', const='remove', dest='action', help='Remove agent(s) from specified queues'
233    )
234
235    # Input source arguments
236    parser.add_argument(
237        '--queues',
238        required=True,
239        help='Text file with list of call queue names (one per line). Each line should be the a '
240        'location name and a queue name separated by a colon. Example: "Location1:Queue1"',
241    )
242    parser.add_argument(
243        '--agent',
244        required=True,
245        help='Single agent email address or text file with agent email addresses (one per line)',
246    )
247    parser.add_argument(
248        '--token',
249        type=str,
250        required=False,
251        help=f'admin access token to use. If no token is given then the script will try to use '
252        f'service app tokens. The service app parameters are read from environment variables '
253        f'SERVICE_APP_ID, SERvICE_APP_SECRET, and SERVICE_APP_REFRESH. These parameters can also '
254        f'be defined in "{os.path.splitext(os.path.basename(__file__))[0]}.env" file. Service '
255        f'app tokens are cached in "'
256        f'{os.path.splitext(os.path.basename(__file__))[0]}.yml". If no access token is passed '
257        f'and no service app '
258        f'is defined then the script falls back to try to read an access token from environment '
259        f'variable WEBEX_ACCESS_TOKEN.',
260    )
261
262    # Debug and HAR output arguments
263    parser.add_argument('--debug', action='store_true', help='Enable debug output')
264    parser.add_argument('--har', action='store_true', help='Enable HAR output')
265
266    # fry run option
267    parser.add_argument('--dry-run', action='store_true', help='Simulate the operation without making actual changes')
268
269    # Parse arguments
270    args = parser.parse_args()
271
272    # Get access token
273    token = args.token
274    load_dotenv(os.path.join(os.getcwd(), f'{os.path.splitext(os.path.basename(__file__))[0]}.env'))
275
276    token = token or (tokens := get_tokens()) and tokens.access_token
277
278    # Read queues from file
279    queues = read_file_lines(args.queues)
280
281    # Determine if agent is a file or a single agent email
282    try:
283        agents = read_file_lines(args.agent) if os.path.isfile(args.agent) else [args.agent]
284    except FileNotFoundError:
285        agents = [args.agent]
286
287    if args.debug:
288        logging.basicConfig(level=logging.DEBUG)
289
290    async def as_main():
291        """
292        Async main entry point
293        """
294
295        @contextmanager
296        def har_writer():
297            """
298            optional context manager to write HAR file
299            """
300            if args.har:
301                with HarWriter(f'{os.path.splitext(os.path.basename(__file__))[0]}.har', api):
302                    yield None
303            else:
304                yield None
305
306        async with AsWebexSimpleApi(tokens=token) as api:
307            with har_writer():
308                # validate queues and agents
309                validated_queues, validated_agents = await asyncio.gather(
310                    validate_queues(api, queues), validate_users(api, agents)
311                )
312                if not validated_queues:
313                    print('No valid queues found. Exiting.', file=sys.stderr)
314                    sys.exit(1)
315                if not validated_agents:
316                    print('No valid agents found. Exiting.', file=sys.stderr)
317                    sys.exit(1)
318                validated_queues: List[CallQueue]
319                validated_agents: List[Person]
320
321                # process all queues in parallel
322                await asyncio.gather(
323                    *[
324                        process_one_queue(
325                            api=api, queue=queue, agents=validated_agents, action=args.action, dry_run=args.dry_run
326                        )
327                        for queue in validated_queues
328                    ],
329                    return_exceptions=False,
330                )
331            # with
332        return
333
334    asyncio.run(as_main())
335
336    print(f'Completed {args.action} operation for {len(agents)} agent(s) across {len(queues)} queue(s).')
337
338
339if __name__ == '__main__':
340    main()

Bulk add phone numbers to locations

Bulk add TNs to Webex Calling locations

 Usage: add_numbers.py [OPTIONS] FILE

 Add TNs to Webex Calling locations

╭─ Arguments ────────────────────────────────────────────────────────────────────────────────╮
│ *    file      PATH  CSV file with location names and TNs [required]                       │
╰────────────────────────────────────────────────────────────────────────────────────────────╯
╭─ Options ──────────────────────────────────────────────────────────────────────────────────╮
│ --dry-run                         Do not make any changes                                  │
│ --verbose                         Print debug information                                  │
│ --log-file                  PATH  Log file. If extension is .har, log in HAR format        │
│ --token                     TEXT  Access token can be provided using --token argument, set │
│                                   in WEBEX_ACCESS_TOKEN environment variable or can be a   │
│                                   service app token. For the latter set environment        │
│                                   variables ('SERVICE_APP_REFRESH_TOKEN',                  │
│                                   'SERVICE_APP_CLIENT_ID', 'SERVICE_APP_CLIENT_SECRET').   │
│                                   Environment variables can also be set in add_numbers.env │
│ --inactive                        Add TNs as inactive                                      │
│ --install-completion              Install completion for the current shell.                │
│ --show-completion                 Show completion for the current shell, to copy it or     │
│                                   customize the installation.                              │
│ --help                            Show this message and exit.                              │
╰────────────────────────────────────────────────────────────────────────────────────────────╯

 Example: ./add_numbers.py add_numbers.csv --log-file add_numbers.har --dry-run

Source: add_numbers.py

  1#!/usr/bin/env -S uv run --script
  2#
  3# /// script
  4# requires-python = ">=3.11,<3.14"
  5# dependencies = [
  6#     "python-dotenv",
  7#     "typer",
  8#     "wxc-sdk",
  9# ]
 10# ///
 11"""
 12Add TNs to a locations.
 13
 14 Usage: add_numbers.py [OPTIONS] FILE
 15
 16 Add TNs to Webex Calling locations
 17
 18╭─ Arguments ────────────────────────────────────────────────────────────────────────────────╮
 19│ *    file      PATH  CSV file with location names and TNs [required]                       │
 20╰────────────────────────────────────────────────────────────────────────────────────────────╯
 21╭─ Options ──────────────────────────────────────────────────────────────────────────────────╮
 22│ --dry-run                         Do not make any changes                                  │
 23│ --verbose                         Print debug information                                  │
 24│ --log-file                  PATH  Log file. If extension is .har, log in HAR format        │
 25│ --token                     TEXT  Access token can be provided using --token argument, set │
 26│                                   in WEBEX_ACCESS_TOKEN environment variable or can be a   │
 27│                                   service app token. For the latter set environment        │
 28│                                   variables ('SERVICE_APP_REFRESH_TOKEN',                  │
 29│                                   'SERVICE_APP_CLIENT_ID', 'SERVICE_APP_CLIENT_SECRET').   │
 30│                                   Environment variables can also be set in add_numbers.env │
 31│ --inactive                        Add TNs as inactive                                      │
 32│ --install-completion              Install completion for the current shell.                │
 33│ --show-completion                 Show completion for the current shell, to copy it or     │
 34│                                   customize the installation.                              │
 35│ --help                            Show this message and exit.                              │
 36╰────────────────────────────────────────────────────────────────────────────────────────────╯
 37
 38 Example: ./add_numbers.py add_numbers.csv --log-file add_numbers.har --dry-run
 39
 40"""
 41
 42import asyncio
 43import csv
 44import logging
 45import os
 46import sys
 47from collections import Counter, defaultdict
 48from contextlib import contextmanager
 49from functools import wraps
 50from itertools import chain
 51from pathlib import Path
 52from typing import Optional
 53
 54import typer
 55from dotenv import load_dotenv
 56from service_app import SERVICE_APP_ENVS, env_path, get_tokens
 57
 58from wxc_sdk.as_api import AsWebexSimpleApi
 59from wxc_sdk.common import NumberState
 60from wxc_sdk.har_writer import HarWriter
 61from wxc_sdk.tokens import Tokens
 62
 63BATCH_SIZE = 10
 64
 65
 66@contextmanager
 67def setup_logging(*, api: AsWebexSimpleApi, verbose: bool, log_file: Optional[Path]):
 68    """
 69    Set up logging
 70    """
 71
 72    @contextmanager
 73    def file_handler():
 74        if not log_file:
 75            yield
 76        else:
 77            # log to file or to HAR
 78            log_file_str = str(log_file)
 79            if os.path.splitext(log_file_str)[-1].lower() == '.har':
 80                with HarWriter(api=api, path=log_file_str):
 81                    yield
 82            else:
 83                f_handler = logging.FileHandler(log_file_str)
 84                f_handler.setLevel(logging.DEBUG)
 85                f_handler.setFormatter(logging.Formatter('%(asctime)s - %(levelname)s - %(message)s'))
 86                logging.getLogger().addHandler(f_handler)
 87                yield
 88        return
 89
 90    logging.getLogger().setLevel(logging.DEBUG)
 91    # create a console logging handler
 92    console_handler = logging.StreamHandler()
 93    console_handler.setLevel(logging.DEBUG if verbose else logging.INFO)
 94    # console_handler.setFormatter(logging.Formatter('%(asctime)s - %(levelname)s - %(message)s'))
 95    logging.getLogger().addHandler(console_handler)
 96    with file_handler():
 97        yield
 98
 99
100def read_csv(file: str) -> dict[str, list[str]]:
101    """
102    Read CSV file with location names and TNs
103    """
104    if not os.path.isfile(file):
105        logging.error(f'File {file} does not exist')
106        exit(1)
107    err = False
108    locations_and_tns: dict[str, list[str]] = defaultdict(list)
109    with open(file) as f:
110        reader = csv.reader(f)
111        for row in reader:
112            if not row:
113                continue
114            if len(row) != 2:
115                logging.error(f'Invalid row: {row}')
116                err = True
117                continue
118            location, tn = row
119            logging.debug(f'Location: {location}, TN: {tn}')
120            locations_and_tns[location].append(tn)
121    if err:
122        logging.error('Errors in the CSV file')
123        exit(1)
124    return locations_and_tns
125
126
127async def verify_location(api: AsWebexSimpleApi, location_name: str) -> Optional[str]:
128    """
129    Verify that a location exists and return location id
130    """
131    try:
132        location = next(
133            (l for l in await api.telephony.locations.list(name=location_name) if l.name == location_name), None
134        )
135        return location and location.location_id
136    except Exception as e:
137        logging.error(f'Failed to get location {location_name}: {e}')
138        return None
139
140
141async def validate_tns(api: AsWebexSimpleApi, tns: list[str]) -> bool:
142    """
143    Validate TNs and return validity
144    """
145    tn_counter = Counter(tns)
146    err = False
147    for tn, count in tn_counter.items():
148        if count > 1:
149            err = True
150            logging.error(f'TN {tn} is duplicated')
151    if err:
152        return False
153
154    # validate numbers in batches
155    try:
156        validations = await asyncio.gather(
157            *[api.telephony.validate_phone_numbers(tns[i : i + BATCH_SIZE]) for i in range(0, len(tns), BATCH_SIZE)]
158        )
159    except Exception as e:
160        logging.error(f'Failed to validate TNs: {e}')
161        return False
162
163    ok_tns = []
164    err = False
165    for status in chain.from_iterable(v.phone_numbers for v in validations):
166        if status.ok:
167            ok_tns.append(status.phone_number)
168        else:
169            err = True
170            logging.error(f'TN {status.phone_number}: {status.state} ')
171    return not err
172
173
174async def add_tns(api: AsWebexSimpleApi, location_id: str, tns: list[str], inactive: bool):
175    """
176    Add TNs to a location
177    """
178    number_state = NumberState.inactive if inactive else NumberState.active
179    try:
180        # add TNs in batches
181        await asyncio.gather(
182            *[
183                api.telephony.location.number.add(
184                    location_id=location_id, phone_numbers=tns[i : i + BATCH_SIZE], state=number_state
185                )
186                for i in range(0, len(tns), BATCH_SIZE)
187            ]
188        )
189    except Exception as e:
190        logging.error(f'Failed to add TNs: {e}')
191        return False
192    return True
193
194
195def async_command(f):
196    @wraps(f)
197    def wrapper(*args, **kwargs):
198        return asyncio.run(f(*args, **kwargs))
199
200    return wrapper
201
202
203app = typer.Typer()
204
205
206@app.command(
207    epilog=f'Example: {sys.argv[0]} add_numbers.csv --log-file add_numbers.har --dry-run',
208    help='Add TNs to Webex Calling locations',
209)
210@async_command
211async def add_numbers(
212    file: Path = typer.Argument(exists=True, help='CSV file with location names and TNs'),
213    dry_run: bool = typer.Option(False, '--dry-run', help='Do not make any changes'),
214    verbose: bool = typer.Option(False, '--verbose', help='Print debug information'),
215    log_file: Optional[Path] = typer.Option(
216        None, '--log-file', help='Log file. If extension is .har, log in HAR format'
217    ),
218    token: Optional[str] = typer.Option(
219        None,
220        '--token',
221        help=f'Access token can be provided using --token argument, '
222        f'set in WEBEX_ACCESS_TOKEN environment variable or '
223        f'can be a service app token. For the latter set '
224        f'environment variables {SERVICE_APP_ENVS}. '
225        f'Environment variables can also be set in '
226        f'{env_path()}',
227    ),
228    inactive: bool = typer.Option(False, '--inactive', help='Add TNs as inactive'),
229):
230    """
231    Add TNs to Webex Calling locations
232    """
233    load_dotenv(env_path(), override=True)
234    tokens = get_tokens() if token is None else Tokens(access_token=token)
235    if tokens is None:
236        print(
237            f'Access token can be provided using --token argument, set in WEBEX_ACCESS_TOKEN environment variable or '
238            f'can be a service app token. For the latter set environment variables {SERVICE_APP_ENVS}. Environment '
239            f'variables can '
240            f'also be set in {env_path()}',
241            file=sys.stderr,
242        )
243        exit(1)
244
245    async with AsWebexSimpleApi(tokens=tokens) as api:
246        with setup_logging(api=api, verbose=verbose, log_file=log_file):
247            # validate the access token
248            try:
249                await api.people.me()
250            except Exception as e:
251                logging.error(f'Failed to get identity: {e}')
252                logging.error('Token might be invalid')
253                exit(1)
254            # read location names and TNs from a CSV file
255            logging.info(f'Reading file {file}')
256            locations_and_tns = read_csv(file)
257
258            # validate the location names
259            logging.info('Validating location names...')
260            location_ids = await asyncio.gather(
261                *[verify_location(api, location) for location in locations_and_tns.keys()]
262            )
263            if not all(location_ids):
264                for location_name, location_id in zip(locations_and_tns.keys(), location_ids):
265                    if not location_id:
266                        logging.error(f'Location {location_name} does not exist')
267                exit(1)
268
269            # validate the TNs
270            logging.info('Validating TNs...')
271            tn_list = list(chain.from_iterable(locations_and_tns.values()))
272            validation = await validate_tns(api, tn_list)
273            if not validation:
274                exit(1)
275
276            # add TNs to the locations
277            if not dry_run:
278                logging.info('Adding TNs...')
279                results = await asyncio.gather(
280                    *[
281                        add_tns(api, location_id, tns, inactive)
282                        for location_id, tns in zip(location_ids, locations_and_tns.values())
283                    ]
284                )
285                if not all(results):
286                    exit(1)
287            #
288            logging.info('Done')
289        # end of logging context
290    # end of API context
291    return
292
293
294if __name__ == '__main__':
295    app()

Bulk add outgoing call permission patterns to locations

Bulk add outgoing call permission patterns to locations

usage: ocp_pattern.py [-h] [--token TOKEN] [--dry-run] [--verbose] [--log-file LOG_FILE]
                      location patterns

Provision OCP patterns for one or all locations

positional arguments:
  location             Location to provision OCP patterns for. Use "all" to provision for
                       all locations
  patterns             File with patterns to provision. File has one pattern per line. Use
                       "remove" to remove all patterns previously provisioned by the
                       script

optional arguments:
  -h, --help           show this help message and exit
  --token TOKEN        Access token can be provided using --token argument, set in
                       WEBEX_ACCESS_TOKEN environment variable or can be a service app
                       token. For the latter set environment variables
                       ('SERVICE_APP_REFRESH_TOKEN', 'SERVICE_APP_CLIENT_ID',
                       'SERVICE_APP_CLIENT_SECRET'). Environment variables can also be set
                       in ocp_pattern.env
  --dry-run            Dry run, do not provision anything
  --verbose            Print debug information
  --log-file LOG_FILE  Log file. If extension is .har, log in HAR format

Example: ocp_pattern.py all ocp_pattern.txt --log-file ocp_pattern.har --dry-run

Source: ocp_pattern.py

  1#!/usr/bin/env python3
  2"""
  3Provision OCP patterns for one or all locations
  4
  5usage: ocp_pattern.py [-h] [--token TOKEN] [--dry-run] [--verbose] [--log-file LOG_FILE]
  6                      location patterns
  7
  8Provision OCP patterns for one or all locations
  9
 10positional arguments:
 11  location             Location to provision OCP patterns for. Use "all" to provision for
 12                       all locations
 13  patterns             File with patterns to provision. File has one pattern per line. Use
 14                       "remove" to remove all patterns previously provisioned by the
 15                       script
 16
 17optional arguments:
 18  -h, --help           show this help message and exit
 19  --token TOKEN        Access token can be provided using --token argument, set in
 20                       WEBEX_ACCESS_TOKEN environment variable or can be a service app
 21                       token. For the latter set environment variables
 22                       ('SERVICE_APP_REFRESH_TOKEN', 'SERVICE_APP_CLIENT_ID',
 23                       'SERVICE_APP_CLIENT_SECRET'). Environment variables can also be set
 24                       in ocp_pattern.env
 25  --dry-run            Dry run, do not provision anything
 26  --verbose            Print debug information
 27  --log-file LOG_FILE  Log file. If extension is .har, log in HAR format
 28
 29Example: ocp_pattern.py all ocp_pattern.txt --log-file ocp_pattern.har --dry-run
 30"""
 31
 32import argparse
 33import asyncio
 34import logging
 35import os
 36import sys
 37from contextlib import contextmanager
 38
 39from dotenv import load_dotenv
 40
 41from examples.service_app import SERVICE_APP_ENVS, env_path, get_tokens
 42from wxc_sdk.as_api import AsWebexSimpleApi
 43from wxc_sdk.as_rest import AsRestError
 44from wxc_sdk.har_writer import HarWriter
 45from wxc_sdk.person_settings.permissions_out import Action, DigitPattern
 46from wxc_sdk.telephony.location import TelephonyLocation
 47from wxc_sdk.tokens import Tokens
 48
 49
 50async def work_on_one_location(
 51    api: AsWebexSimpleApi, location: TelephonyLocation, pattern_list: list[str], dry_run: bool
 52):
 53    """
 54    Work on one location, create, update or remove patterns
 55    """
 56
 57    async def create(pattern: str):
 58        try:
 59            await dapi.create(
 60                location.location_id,
 61                DigitPattern(pattern=pattern, name=f'ocp-{pattern}', action=Action.allow, transfer_enabled=True),
 62            )
 63            print(f'{location.name}: created pattern {pattern}')
 64        except AsRestError as e:
 65            print(f'{location.name}: failed to create pattern {pattern}, error: {e}')
 66            raise
 67
 68    async def remove(pattern: DigitPattern):
 69        try:
 70            await dapi.delete(location.location_id, pattern.id)
 71            print(f'{location.name}: removed pattern {pattern.pattern}')
 72        except AsRestError as e:
 73            print(f'{location.name}: failed to remove pattern {pattern.pattern}, error: {e}')
 74            raise
 75
 76    async def update(pattern: DigitPattern):
 77        try:
 78            await dapi.update(
 79                location.location_id,
 80                DigitPattern(
 81                    pattern=pattern.pattern, name=f'ocp-{pattern.pattern}', action=Action.allow, transfer_enabled=True
 82                ),
 83            )
 84            print(f'{location.name}: updated pattern {pattern.pattern}')
 85        except AsRestError as e:
 86            print(f'{location.name}: failed to update pattern {pattern.pattern}, error: {e}')
 87            raise
 88
 89    pattern_set = set(pattern_list)
 90    dapi = api.telephony.permissions_out.digit_patterns
 91
 92    # get ocp patterns for location
 93    location_digit_patterns = await dapi.get_digit_patterns(location.location_id)
 94
 95    existing_patterns = [
 96        pattern for pattern in location_digit_patterns.digit_patterns if pattern.pattern in pattern_set
 97    ]
 98    missing_patterns = [
 99        pattern
100        for pattern in pattern_list
101        if pattern not in set(map(lambda p: p.pattern, location_digit_patterns.digit_patterns))
102    ]
103    to_be_removed = [
104        pattern
105        for pattern in location_digit_patterns.digit_patterns
106        if pattern.name.startswith('ocp-') and pattern.pattern not in pattern_set
107    ]
108    tasks = []
109
110    # remove patterns
111    for remove_pattern in to_be_removed:
112        # remove pattern
113        print(f'{location.name}: remove pattern {remove_pattern.pattern}')
114        if not dry_run:
115            tasks.append(remove(remove_pattern))
116
117    # add missing patterns
118    for pattern in missing_patterns:
119        # add pattern
120        print(f'{location.name}: add pattern {pattern}')
121        if not dry_run:
122            tasks.append(create(pattern))
123
124    # check existing patterns and update if action is not "allow"
125    for existing in existing_patterns:
126        if existing.action == Action.allow and existing.transfer_enabled:
127            # pattern already exists and is allowed
128            continue
129        # update existing pattern
130        print(f'{location.name}: update pattern {existing.pattern}')
131        if not dry_run:
132            tasks.append(update(existing))
133    if tasks:
134        # run tasks
135        await asyncio.gather(*tasks)
136    return
137
138
139@contextmanager
140def setup_logging(args: argparse.Namespace, api: AsWebexSimpleApi):
141    """
142    Set up logging
143    """
144
145    @contextmanager
146    def file_handler(log_file: str):
147        if not log_file:
148            yield
149        else:
150            # log to file or to HAR
151            if os.path.splitext(log_file)[-1].lower() == '.har':
152                with HarWriter(api=api, path=log_file):
153                    yield
154            else:
155                f_handler = logging.FileHandler(args.log_file)
156                f_handler.setLevel(logging.DEBUG)
157                f_handler.setFormatter(logging.Formatter('%(asctime)s - %(levelname)s - %(message)s'))
158                logging.getLogger().addHandler(f_handler)
159                yield
160        return
161
162    logging.getLogger().setLevel(logging.DEBUG)
163    # create a console logging handler
164    console_handler = logging.StreamHandler()
165    console_handler.setLevel(logging.DEBUG if args.verbose else logging.INFO)
166    console_handler.setFormatter(logging.Formatter('%(asctime)s - %(levelname)s - %(message)s'))
167    logging.getLogger().addHandler(console_handler)
168    with file_handler(args.log_file):
169        yield
170
171
172async def main():
173    parser = argparse.ArgumentParser(
174        description='Provision OCP patterns for one or all locations',
175        epilog='Example: %(prog)s all ocp_pattern.txt --log-file ocp_pattern.har --dry-run',
176    )
177    parser.add_argument(
178        'location', type=str, help='Location to provision OCP patterns for. Use "all" to provision for all locations'
179    )
180    parser.add_argument(
181        'patterns',
182        type=str,
183        help='File with patterns to provision. File has one pattern '
184        'per line. Use "remove" to remove all patterns previously '
185        'provisioned '
186        'by the script',
187    )
188    parser.add_argument(
189        '--token',
190        help=f'Access token can be provided using --token argument, set in '
191        f'WEBEX_ACCESS_TOKEN environment variable or can be a service app token. For '
192        f'the latter set environment variables {SERVICE_APP_ENVS}. Environment '
193        f'variables can also be set in {env_path()}',
194    )
195    parser.add_argument('--dry-run', action='store_true', help='Dry run, do not provision anything')
196    parser.add_argument('--verbose', action='store_true', help='Print debug information')
197    parser.add_argument('--log-file', help='Log file. If extension is .har, log in HAR format')
198
199    args = parser.parse_args()
200    location_name = args.location
201    pattern_file = args.patterns
202    dry_run = args.dry_run
203
204    # get tokens
205    # if token is provided use that token, else try to read from file
206    load_dotenv(env_path(), override=True)
207    tokens = get_tokens() if args.token is None else Tokens(access_token=args.token)
208    if tokens is None:
209        print(
210            f'Access token can be provided using --token argument, set in WEBEX_ACCESS_TOKEN environment variable '
211            f'or '
212            f'can be a service app token. For the latter set environment variables {SERVICE_APP_ENVS}. Environment '
213            f'variables can '
214            f'also be set in {env_path()}',
215            file=sys.stderr,
216        )
217        exit(1)
218    async with AsWebexSimpleApi(tokens=tokens, concurrent_requests=100) as api:
219        with setup_logging(args, api):
220            # validate location
221            if location_name.lower() == 'all':
222                # all locations
223                location_list = await api.telephony.locations.list()
224                location_list.sort(key=lambda loc: loc.name)
225            else:
226                # single location
227                location_list = [
228                    loc for loc in await api.telephony.locations.list(name=location_name) if loc.name == location_name
229                ]
230                if not location_list:
231                    print(f'Location {location_name} not found', file=sys.stderr)
232                    exit(1)
233
234            # read patterns from given file
235            if pattern_file.lower() == 'remove':
236                # remove all patterns
237                pattern_list = []
238            else:
239                try:
240                    with open(pattern_file) as f:
241                        pattern_list = [ps for p in f.readlines() if (ps := p.strip()) and not p.startswith('#')]
242                except FileNotFoundError:
243                    print(f'File {pattern_file} not found', file=sys.stderr)
244                    exit(1)
245
246            # apply changes to all locations
247            print(f'Working on {len(location_list)} location(s), {len(pattern_list)} patterns')
248            await asyncio.gather(*[work_on_one_location(api, loc, pattern_list, dry_run) for loc in location_list])
249    return
250
251
252if __name__ == '__main__':
253    asyncio.run(main())

Bulk provisioning of 3rd party devices in Workspaces

Bulk provisioning of 3rd party devices in Workspaces

usage: workspace_w_3rd_party.py [-h] [--token TOKEN] [--dry-run]
                                [--log-file LOG_FILE] [--cleanup]
                                csv [output]

Provision workspaces with 3rd party devices.

positional arguments:
  csv                  CSV with workspaces to provision. CSV has the
                       following columns: * workspace name: the workspace
                       will be created * location name: must be an
                       existing location * extension (optional); if
                       missing a new extension will be generated starting
                       at 2000 * MAC address; if empty a new (dummy) MAC
                       address will be generated as DEAD-DEAD-XXXX *
                       password (optional); if missing a new (random
                       password will be generated
  output               Output CSV with the provisioning results. Not
                       required in dry-run mode

optional arguments:
  -h, --help           show this help message and exit
  --token TOKEN        Access token can be provided using --token
                       argument, set in WEBEX_ACCESS_TOKEN environment
                       variable or can be a service app token. For the
                       latter set environment variables
                       ('SERVICE_APP_REFRESH_TOKEN',
                       'SERVICE_APP_CLIENT_ID',
                       'SERVICE_APP_CLIENT_SECRET'). Environment variables
                       can also be set in workspace_w_3rd_party.env
  --dry-run            Dry run, do not provision anything
  --log-file LOG_FILE  Log file. If extension is .har, log in HAR format
  --cleanup            remove workspaces

Example: workspace_w_3rd_party.py input.csv output.csv --log-file log.har

Source: workspace_w_3rd_party.py

  1#!/usr/bin/env python
  2"""
  3Create workspaces with 3rd party devices.
  4From a CSV read:
  5    * workspace name: if workspace doesn't exist a new workspace will be created
  6    * location name: must be an existing location
  7    * extension (optional); if missing a new extension will be generated starting at 2000
  8    * MAC address; if empty a new (dummy) MAC address will be generated as DEAD-DEAD-XXXX
  9    * password (optional); if missing a new (random password will be generated
 10The output is a CSV file with the following columns:
 11    * workspace name
 12    * location name
 13    * extension
 14    * MAC address
 15    * password
 16    * outbound proxy
 17    * SIP user name
 18    * line/port
 19
 20usage: workspace_w_3rd_party.py [-h] [--token TOKEN] [--dry-run]
 21                                [--log-file LOG_FILE] [--cleanup]
 22                                csv [output]
 23
 24Provision workspaces with 3rd party devices.
 25
 26positional arguments:
 27  csv                  CSV with workspaces to provision. CSV has the
 28                       following columns: * workspace name: the workspace
 29                       will be created * location name: must be an
 30                       existing location * extension (optional); if
 31                       missing a new extension will be generated starting
 32                       at 2000 * MAC address; if empty a new (dummy) MAC
 33                       address will be generated as DEAD-DEAD-XXXX *
 34                       password (optional); if missing a new (random
 35                       password will be generated
 36  output               Output CSV with the provisioning results. Not
 37                       required in dry-run mode
 38
 39optional arguments:
 40  -h, --help           show this help message and exit
 41  --token TOKEN        Access token can be provided using --token
 42                       argument, set in WEBEX_ACCESS_TOKEN environment
 43                       variable or can be a service app token. For the
 44                       latter set environment variables
 45                       ('SERVICE_APP_REFRESH_TOKEN',
 46                       'SERVICE_APP_CLIENT_ID',
 47                       'SERVICE_APP_CLIENT_SECRET'). Environment variables
 48                       can also be set in workspace_w_3rd_party.env
 49  --dry-run            Dry run, do not provision anything
 50  --log-file LOG_FILE  Log file. If extension is .har, log in HAR format
 51  --cleanup            remove workspaces
 52
 53Example: workspace_w_3rd_party.py input.csv output.csv --log-file log.har
 54"""
 55
 56import argparse
 57import asyncio
 58import csv
 59import logging
 60import os
 61import sys
 62from collections import defaultdict
 63from collections.abc import Generator
 64from contextlib import contextmanager
 65from dataclasses import dataclass, field
 66from itertools import chain, zip_longest
 67
 68from dotenv import load_dotenv
 69
 70from examples.service_app import SERVICE_APP_ENVS, env_path, get_tokens
 71from open_api.generated.Shared.workspaces_auto import WorkspaceType
 72from wxc_sdk.as_api import AsWebexSimpleApi
 73from wxc_sdk.common import DevicePlatform
 74from wxc_sdk.har_writer import HarWriter
 75from wxc_sdk.licenses import License
 76from wxc_sdk.telephony.devices import MACState, MACValidationResponse
 77from wxc_sdk.telephony.location import TelephonyLocation
 78from wxc_sdk.tokens import Tokens
 79from wxc_sdk.workspaces import (
 80    CallingType,
 81    Workspace,
 82    WorkspaceCalling,
 83    WorkspaceSupportedDevices,
 84    WorkspaceWebexCalling,
 85)
 86
 87MAC_VALIDATION_BATCH_SIZE = 100
 88
 89
 90@dataclass
 91class CSVRow:
 92    """
 93    CSV row with workspace and location name
 94    """
 95
 96    workspace_name: str
 97    location_name: str
 98    extension: str
 99    mac_address: str
100    password: str
101    workspace: Workspace = field(default=None, init=False)
102    calling_license_id: str = field(default=None, init=False)
103    location: TelephonyLocation = field(default=None, init=False)
104    outbound_proxy: str = field(default=None, init=False)
105    sip_user_name: str = field(default=None, init=False)
106    line_port: str = field(default=None, init=False)
107
108    def __post_init__(self):
109        # clean up some data
110        self.workspace_name = self.workspace_name.strip()
111        self.location_name = self.location_name.strip()
112        self.extension = self.extension.strip() or None
113        self.mac_address = self.mac_address.strip().lower() or None
114        self.password = self.password.strip() or None
115
116    @classmethod
117    def from_csv(cls, csv_path: str) -> Generator['CSVRow', None, None]:
118        """
119        Yield CSVRow instances from CSV file
120        """
121        err = False
122        with open(csv_path, newline='') as csv_file:
123            reader = csv.reader(csv_file)
124            for row_number, row in enumerate(reader, 1):
125                try:
126                    yield cls(*row)
127                except TypeError as te:
128                    err = True
129                    print(f'Failed to parse row {row_number}: {te}', file=sys.stderr)
130                    continue
131        if err:
132            print(f'Failed to parse {csv_path}', file=sys.stderr)
133            exit(1)
134        return
135
136
137@contextmanager
138def setup_logging(args: argparse.Namespace, api: AsWebexSimpleApi):
139    """
140    Set up logging
141    """
142
143    @contextmanager
144    def file_handler(log_file: str):
145        if not log_file:
146            yield
147        else:
148            # log to file or to HAR
149            if os.path.splitext(log_file)[-1].lower() == '.har':
150                with HarWriter(api=api, path=log_file):
151                    yield
152            else:
153                f_handler = logging.FileHandler(args.log_file)
154                f_handler.setLevel(logging.DEBUG)
155                f_handler.setFormatter(logging.Formatter('%(asctime)s - %(levelname)s - %(message)s'))
156                logging.getLogger().addHandler(f_handler)
157                yield
158        return
159
160    logging.getLogger().setLevel(logging.DEBUG)
161    # create a console logging handler
162    console_handler = logging.StreamHandler()
163    console_handler.setLevel(logging.INFO)
164    console_handler.setFormatter(logging.Formatter('%(asctime)s - %(levelname)s - %(message)s'))
165    logging.getLogger().addHandler(console_handler)
166    with file_handler(args.log_file):
167        yield
168
169
170# list of validation errors
171ValidationResult = list[tuple[int, str]]
172
173
174async def validate_extensions(
175    *, api: AsWebexSimpleApi, location: TelephonyLocation, indexed_rows: list[tuple[int, CSVRow]]
176) -> ValidationResult:
177    """
178    Make sure that extensions are unique; if no extension is provided, generate a new one
179    """
180    extensions_in_location = {
181        number.extension
182        for number in await api.telephony.phone_numbers(location_id=location.location_id)
183        if number.extension is not None
184    }
185    errors = []
186    for row_index, csv_row in indexed_rows:
187        if csv_row.extension:
188            if csv_row.extension in extensions_in_location:
189                errors.append((row_index, f'Duplicate extension "{csv_row.extension}" in location "{location.name}"'))
190        else:
191            # generate new extension
192            csv_row.extension = next(str(ext) for ext in range(2000, 10000) if str(ext) not in extensions_in_location)
193            print(f'Row {row_index}: Generated new extension "{csv_row.extension}"')
194        extensions_in_location.add(csv_row.extension)
195
196    return errors
197
198
199async def generate_passwords(
200    *, api: AsWebexSimpleApi, location: TelephonyLocation, indexed_rows: list[tuple[int, CSVRow]]
201) -> ValidationResult:
202    """
203    Generate random passwords for rows without a password
204    """
205    new_passwords = await asyncio.gather(
206        *[
207            api.telephony.location.generate_password(location_id=location.location_id)
208            for _, csv_row in indexed_rows
209            if not csv_row.password
210        ]
211    )
212
213    for row_index, csv_row in indexed_rows:
214        if not csv_row.password:
215            # generate new password
216            csv_row.password = new_passwords.pop(0)
217            print(f'Row {row_index}: Generated new password "{csv_row.password}"')
218    return []
219
220
221async def validate_locations_and_extensions(
222    *, api: AsWebexSimpleApi, csv_rows: list[CSVRow], cleanup: bool
223) -> ValidationResult:
224    """
225    Locations must exist and extensions must be unique
226    """
227    if cleanup:
228        return []
229    locations = {location.name: location for location in await api.telephony.locations.list()}
230    errors = []
231    for row_index, csv_row in enumerate(csv_rows, 1):
232        location = locations.get(csv_row.location_name)
233        if not location:
234            errors.append((row_index, f'Location "{csv_row.location_name}" does not exist'))
235        csv_row.location = location
236
237    # for all existing locations validate extensions
238    csv_rows_by_location: dict[str, list[tuple[int, CSVRow]]] = defaultdict(list)
239    for row_index, csv_row in enumerate(csv_rows, 1):
240        if csv_row.location_name in locations:
241            csv_rows_by_location[csv_row.location_name].append((row_index, csv_row))
242    tasks = [
243        validate_extensions(api=api, location=locations[l_name], indexed_rows=indexed_rows)
244        for l_name, indexed_rows in csv_rows_by_location.items()
245    ]
246    # also generate passwords for rows without a password
247    tasks.extend(
248        generate_passwords(api=api, location=locations[l_name], indexed_rows=indexed_rows)
249        for l_name, indexed_rows in csv_rows_by_location.items()
250    )
251    location_results = await asyncio.gather(*tasks)
252    errors.extend(chain.from_iterable(location_results))
253    return errors
254
255
256async def assign_new_mac_addresses(*, api: AsWebexSimpleApi, csv_rows: list[CSVRow]) -> ValidationResult:
257    """
258    Get new MAC addresses for rows where no MAC address is provided
259    """
260    number_of_missing_mac_addresses = sum(1 for csv_row in csv_rows if not csv_row.mac_address)
261
262    if not number_of_missing_mac_addresses:
263        return []
264    mac_prefix = 'DEADDEAD'
265    mac_addresses_in_csv = {csv_row.mac_address for csv_row in csv_rows if csv_row.mac_address}
266
267    def mac_candidates() -> Generator[str, None, None]:
268        for v in range(0, 65536):
269            candidate = f'{mac_prefix}{hex(v)[2:].zfill(4).upper()}'
270            if candidate in mac_addresses_in_csv:
271                # skip mac addresses that are already in the csv
272                continue
273            yield f'{mac_prefix}{hex(v)[2:].zfill(4).upper()}'
274
275    new_mac_addresses = []
276
277    # test macs in batches
278    batch_args = [mac_candidates()] * MAC_VALIDATION_BATCH_SIZE
279
280    # noinspection PyArgumentList
281    batches = zip_longest(*batch_args)
282    for batch in batches:
283        validation_result = await api.telephony.devices.validate_macs(macs=list(batch))
284        errored_macs = set(ms.mac for ms in (validation_result.mac_status or []) if ms.state != MACState.available)
285        new_mac_addresses.extend(mac for mac in batch if mac not in errored_macs)
286        if len(new_mac_addresses) >= number_of_missing_mac_addresses:
287            break
288    for row_index, csv_row in enumerate(csv_rows, 1):
289        if not csv_row.mac_address:
290            # generate new MAC address
291            csv_row.mac_address = new_mac_addresses.pop(0)
292            print(f'Row {row_index}: Generated new MAC address "{csv_row.mac_address}"')
293    return []
294
295
296async def mac_addresses_available(*, api: AsWebexSimpleApi, csv_rows: list[CSVRow]) -> ValidationResult:
297    """
298    Check if MAC addresses provided in CSV are available
299    """
300    errors = []
301    mac_addresses = list(set(csv_row.mac_address for csv_row in csv_rows if csv_row.mac_address))
302    if not mac_addresses:
303        return []
304
305    # validate in batches
306    batches = [
307        mac_addresses[i : i + MAC_VALIDATION_BATCH_SIZE]
308        for i in range(0, len(mac_addresses), MAC_VALIDATION_BATCH_SIZE)
309    ]
310    results = await asyncio.gather(*[api.telephony.devices.validate_macs(macs=batch) for batch in batches])
311    results: list[MACValidationResponse]
312
313    errored_macs: dict[str, str] = dict()
314    for result in results:
315        errored_macs.update(
316            (ms.mac, f'{ms.state}, {ms.message}') for ms in (result.mac_status or []) if ms.state != MACState.available
317        )
318
319    if not errored_macs:
320        return []
321    for row_index, csv_row in enumerate(csv_rows, 1):
322        if csv_row.mac_address and csv_row.mac_address in errored_macs:
323            errors.append((row_index, f'MAC address "{csv_row.mac_address}": {errored_macs[csv_row.mac_address]}'))
324    return errors
325
326
327async def validate_mac_addresses(*, api: AsWebexSimpleApi, csv_rows: list[CSVRow], cleanup: bool) -> ValidationResult:
328    """
329    mac addresses must be unique and available
330    """
331    if cleanup:
332        return []
333    mac_addresses = set()
334    errors = []
335    # check if provided MAC addresses are unique
336    for row_index, csv_row in enumerate(csv_rows, 1):
337        if csv_row.mac_address:
338            if csv_row.mac_address in mac_addresses:
339                errors.append((row_index, f'Duplicate MAC address "{csv_row.mac_address}"'))
340            mac_addresses.add(csv_row.mac_address)
341
342    results = await asyncio.gather(
343        assign_new_mac_addresses(api=api, csv_rows=csv_rows), mac_addresses_available(api=api, csv_rows=csv_rows)
344    )
345    errors.extend(chain.from_iterable(results))
346    return errors
347
348
349async def validate_workspaces_and_licenses(
350    *, api: AsWebexSimpleApi, csv_rows: list[CSVRow], cleanup: bool
351) -> ValidationResult:
352    """
353    Workspaces should not exist, also get license for new workspaces
354    """
355    tasks = [api.workspaces.list()]
356    if not cleanup:
357        tasks.append(api.licenses.list())
358    results = await asyncio.gather(*tasks)
359    workspace_list = results[0]
360    if cleanup:
361        licenses = []
362    else:
363        licenses = results[1]
364    workspace_list: list[Workspace]
365    licenses: list[License]
366    workspaces = {ws.display_name: ws for ws in workspace_list}
367
368    def calling_license_id() -> Generator[str, None, None]:
369        """
370        calling license id for license with available entitlement
371        """
372        candidate_licenses = [lic for lic in licenses if lic.webex_calling_workspaces or lic.webex_calling_professional]
373        # make sure to consume workspace licenses first
374        candidate_licenses.sort(key=lambda x: x.name, reverse=True)
375        for lic in candidate_licenses:
376            while lic.consumed_units < lic.total_units:
377                lic.consumed_units += 1
378                yield lic.license_id
379        return
380
381    errors = []
382    license_id_gen = calling_license_id()
383    for row_index, csv_row in enumerate(csv_rows, 1):
384        # check if workspace exists
385        ws = workspaces.get(csv_row.workspace_name)
386        csv_row.workspace = ws
387        if cleanup:
388            if not ws:
389                errors.append((row_index, f'Workspace "{csv_row.workspace_name}" does not exist'))
390            continue
391        if ws:
392            errors.append((row_index, f'Workspace "{csv_row.workspace_name}" already exists'))
393        # get a license for the workspace
394        try:
395            license_id = next(license_id_gen)
396        except StopIteration:
397            errors.append((row_index, f'No more licenses available for "{csv_row.workspace_name}"'))
398            continue
399        csv_row.calling_license_id = license_id
400    return errors
401
402
403async def validate_and_prepare(*, api: AsWebexSimpleApi, csv_rows: list[CSVRow], cleanup: bool) -> None:
404    """
405    validate csv and prepare for provisioning
406        * location exists
407        * extensions are unique (if provided)
408        * MAC addresses are unique (if provided)
409        * workspace names are unique
410        * no devices in the workspace
411    """
412    results = await asyncio.gather(
413        validate_locations_and_extensions(api=api, csv_rows=csv_rows, cleanup=cleanup),
414        validate_mac_addresses(api=api, csv_rows=csv_rows, cleanup=cleanup),
415        validate_workspaces_and_licenses(api=api, csv_rows=csv_rows, cleanup=cleanup),
416    )
417    errors = list(chain.from_iterable(results))
418    errors: ValidationResult
419    errors.sort(key=lambda x: x[0])
420    if errors:
421        print('Validation errors:', file=sys.stderr)
422        for row_index, error in errors:
423            print(f'Row {row_index}: {error}', file=sys.stderr)
424        exit(1)
425    return
426
427
428async def delete_workspaces(*, api: AsWebexSimpleApi, csv_rows: list[CSVRow], dry_run: bool) -> None:
429    """
430    cleanup: delete workspaces
431    """
432
433    async def delete_one_workspace(workspace: Workspace) -> None:
434        if dry_run:
435            print(f'Delete workspace "{workspace.display_name}"')
436        else:
437            await api.workspaces.delete_workspace(workspace_id=workspace.workspace_id)
438            print(f'Deleted workspace "{workspace.display_name}"')
439        return
440
441    await asyncio.gather(*[delete_one_workspace(csv_row.workspace) for csv_row in csv_rows if csv_row.workspace])
442
443
444async def provision_row(*, api: AsWebexSimpleApi, csv_row: CSVRow) -> None:
445    """
446    Provision a single row
447    """
448    # create workspace
449    settings = Workspace(
450        location_id=csv_row.location.location_id,
451        display_name=csv_row.workspace_name,
452        type=WorkspaceType.desk,
453        capacity=1,
454        supported_devices=WorkspaceSupportedDevices.phones,
455        device_platform=DevicePlatform.cisco,
456        calling=WorkspaceCalling(
457            type=CallingType.webex,
458            webex_calling=WorkspaceWebexCalling(
459                licenses=[csv_row.calling_license_id],
460                extension=csv_row.extension,
461                location_id=csv_row.location.location_id,
462            ),
463        ),
464    )
465    workspace = await api.workspaces.create(settings=settings)
466    print(f'Provisioned workspace "{workspace.display_name}"')
467
468    # create device
469    device = await api.devices.create_by_mac_address(
470        mac=csv_row.mac_address,
471        workspace_id=workspace.workspace_id,
472        model='Generic IPPhone Customer Managed',
473        password=csv_row.password,
474    )
475
476    details = await api.telephony.devices.details(device_id=device.device_id)
477    print(f'Provisioned device in workspace "{workspace.display_name}"')
478    csv_row.sip_user_name = details.owner.sip_user_name
479    csv_row.line_port = details.owner.line_port
480    csv_row.outbound_proxy = details.proxy.outbound_proxy
481
482    return
483
484
485def main():
486    async def as_main():
487        # read CSV file
488        csv_rows = list(CSVRow.from_csv(csv_file))
489        async with AsWebexSimpleApi(tokens=tokens) as api:
490            with setup_logging(args, api):
491                # validation and preparation for provisioning
492                await validate_and_prepare(api=api, csv_rows=csv_rows, cleanup=args.cleanup)
493                if args.cleanup:
494                    await delete_workspaces(api=api, csv_rows=csv_rows, dry_run=args.dry_run)
495                    return
496
497                if args.dry_run:
498                    print('Dry run, not provisioning anything')
499                    return
500                await asyncio.gather(*[provision_row(api=api, csv_row=csv_row) for csv_row in csv_rows])
501                # write output
502                if args.output:
503                    with open(args.output, 'w', newline='') as output:
504                        writer = csv.writer(output)
505                        writer.writerow(
506                            [
507                                'workspace_name',
508                                'location_name',
509                                'extension',
510                                'mac_address',
511                                'password',
512                                'outbound_proxy',
513                                'sip_user_name',
514                                'line_port',
515                            ]
516                        )
517                        for csv_row in csv_rows:
518                            writer.writerow(
519                                [
520                                    csv_row.workspace_name,
521                                    csv_row.location_name,
522                                    csv_row.extension,
523                                    csv_row.mac_address,
524                                    csv_row.password,
525                                    csv_row.outbound_proxy,
526                                    csv_row.sip_user_name,
527                                    csv_row.line_port,
528                                ]
529                            )
530                    # for
531                # with open
532            # with setup_logging
533        # async with AsWebexSimpleApi
534        return
535
536    # parse arguments
537    parser = argparse.ArgumentParser(
538        description='Provision workspaces with 3rd party devices.',
539        epilog='Example: %(prog)s input.csv output.csv --log-file log.har',
540    )
541    parser.add_argument(
542        'csv',
543        type=str,
544        help="""CSV with workspaces to provision. CSV has the following columns:
545    * workspace name: the workspace will be created
546    * location name: must be an existing location
547    * extension (optional); if missing a new extension will be generated starting at 2000
548    * MAC address; if empty a new (dummy) MAC address will be generated as DEAD-DEAD-XXXX
549    * password (optional); if missing a new (random password will be generated""",
550    )
551    parser.add_argument(
552        'output', nargs='?', type=str, help='Output CSV with the provisioning results. Not required in dry-run mode'
553    )
554    parser.add_argument(
555        '--token',
556        help=f'Access token can be provided using --token argument, set in '
557        f'WEBEX_ACCESS_TOKEN environment variable or can be a service app token. For '
558        f'the latter set environment variables {SERVICE_APP_ENVS}. Environment '
559        f'variables can also be set in {env_path()}',
560    )
561    parser.add_argument('--dry-run', action='store_true', help='Dry run, do not provision anything')
562    parser.add_argument('--log-file', help='Log file. If extension is .har, log in HAR format')
563    parser.add_argument('--cleanup', action='store_true', help='remove workspaces')
564    args = parser.parse_args()
565    if not any((args.output, args.dry_run, args.cleanup)):
566        parser.error('Output file is required if not dry-run cleanup mode')
567    if args.output and (args.dry_run or args.cleanup):
568        parser.error('Output file is not used in dry-run cleanup mode')
569    csv_file = args.csv
570    if not os.path.isfile(csv_file):
571        print(f'File {csv_file} does not exist', file=sys.stderr)
572        exit(1)
573
574    # read tokens
575    load_dotenv(env_path())
576    tokens = get_tokens() if args.token is None else Tokens(access_token=args.token)
577    if tokens is None:
578        print(
579            f'Access token can be provided using --token argument, set in WEBEX_ACCESS_TOKEN environment variable '
580            f'or can be a service app token. For the latter set environment variables {SERVICE_APP_ENVS}. Environment '
581            f'variables can also be set in {env_path()}',
582            file=sys.stderr,
583        )
584        exit(1)
585
586    asyncio.run(as_main())
587
588
589if __name__ == '__main__':
590    main()