Welcome to Bosen’s documentation!

Contents:

class petuum::AbstractRow

This class defines the interface of the Row type.

ApplyUpdate() and ApplyBatchUpdate() have to be concurrent with each other and with other functions that may be invoked by application threads. Petuum system does not require thread safety of other functions.

Public Functions

virtual ~AbstractRow()
virtual void Init(int32_t capacity) = 0
virtual AbstractRow *Clone() const = 0
virtual size_t get_update_size() const = 0
virtual size_t SerializedSize() const = 0

Return
Upper bound of the number of bytes that serialized row shall occupy. Find some balance between tightness and time complexity.

virtual size_t Serialize(void *bytes) const = 0

Return
The exact size of serialized row.
Parameters
  • Bytes: points to a chunk of allocated memory whose size is guaranteed to be at least SerializedSize(). Need not be thread safe.

virtual bool Deserialize(const void *data, size_t num_bytes) = 0

Deserialize and initialize a row.

Init is not called yet. Need not be thread-safe.

Return
true on success, false otherwise.

virtual void ResetRowData(const void *data, size_t num_bytes) = 0

Init or Deserialize has been called on this row.

Need not be thread-safe.

virtual void GetWriteLock() = 0

Write lock ensure mutual exclusiveness for non-thread-safe functions.

virtual void ReleaseWriteLock() = 0

Write lock ensure mutual exclusiveness for non-thread-safe functions.

virtual double ApplyIncGetImportance(int32_t column_id, const void *update) = 0

Thread safe.

virtual double ApplyBatchIncGetImportance(const int32_t *column_ids, const void *update_batch, int32_t num_updates) = 0

Thread safe.

Return
importance.
Parameters
  • update_batch: points to where updates are stored contiguously in the memory.

virtual double ApplyIncUnsafeGetImportance(int32_t column_id, const void *update) = 0

Not necessarily thread-safe.

PS guarantees to not call this function concurrently with other functions or itself.

virtual double ApplyBatchIncUnsafeGetImportance(const int32_t *column_ids, const void *update_batch, int32_t num_updates) = 0

Not necessarily thread-safe.

PS guarantees to not call this function concurrently with other functions or itself.

virtual void ApplyInc(int32_t column_id, const void *update) = 0

Thread safe.

virtual void ApplyBatchInc(const int32_t *column_ids, const void *update_batch, int32_t num_updates) = 0

Thread safe.

Parameters
  • update_batch: points to where updates are stored contiguously in the memory.

virtual void ApplyIncUnsafe(int32_t column_id, const void *update) = 0

Not necessarily thread-safe.

PS guarantees to not call this function concurrently with other functions or itself.

virtual void ApplyBatchIncUnsafe(const int32_t *column_ids, const void *update_batch, int32_t num_updates) = 0

Not necessarily thread-safe.

PS guarantees to not call this function concurrently with other functions or itself.

virtual double ApplyDenseBatchIncGetImportance(const void *update_batch, int32_t index_st, int32_t num_updates) = 0

Parameters
  • update_batch: contains an update for each each element within the capacity of the row, in the order of increasing column_ids.

virtual void ApplyDenseBatchInc(const void *update_batch, int32_t index_st, int32_t num_updates) = 0

Parameters
  • update_batch: contains an update for each each element within the capacity of the row, in the order of increasing column_ids.

virtual double ApplyDenseBatchIncUnsafeGetImportance(const void *update_batch, int32_t index_st, int32_t num_updates) = 0

Parameters
  • update_batch: contains an update for each each element within the capacity of the row, in the order of increasing column_ids.

virtual void ApplyDenseBatchIncUnsafe(const void *update_batch, int32_t index_st, int32_t num_updates) = 0

Parameters
  • update_batch: contains an update for each each element within the capacity of the row, in the order of increasing column_ids.

virtual void AddUpdates(int32_t column_id, void *update1, const void *update2) const = 0

Aggregate update1 and update2 by summation and substraction (update1 - update2), outputing to update2.

column_id is optionally used in case updates are applied differently for different column of a row.

Both AddUpdates and SubstractUpdates should behave like a static method. But we cannot have virtual static method. Need be thread-safe and better be concurrent. Those functions must work correctly without Init() or Deserialize().

virtual void SubtractUpdates(int32_t column_id, void *update1, const void *update2) const = 0

Aggregate update1 and update2 by summation and substraction (update1 - update2), outputing to update2.

column_id is optionally used in case updates are applied differently for different column of a row.

Both AddUpdates and SubstractUpdates should behave like a static method. But we cannot have virtual static method. Need be thread-safe and better be concurrent. Those functions must work correctly without Init() or Deserialize().

virtual double GetImportance(int32_t column_id, const void *update, const void *value) const = 0

Return
importance of this update as if it is applied on to the given value.

virtual double GetImportance(int32_t column_id, const void *update) const = 0
virtual double GetAccumImportance(const int32_t *column_ids, const void *update_batch, int32_t num_updates) const = 0
virtual double GetDenseAccumImportance(const void *update_batch, int32_t index_st, int32_t num_updates) const = 0
virtual void InitUpdate(int32_t column_id, void *zero) const = 0

Initialize update.

Initialized update represents “zero update”. In other words, 0 + u = u (0 is the zero update).

virtual bool CheckZeroUpdate(const void *update) const = 0
struct petuum::ClientTableConfig

ClientTableConfig is used by client only.

Public Functions

ClientTableConfig()

Public Members

TableInfo table_info
size_t process_cache_capacity

In # of rows.

size_t thread_cache_capacity

In # of rows.

size_t oplog_capacity

Estimated upper bound # of pending oplogs in terms of # of rows.

For SSP this is the # of rows all threads collectively touches in a Clock().

OpLogType oplog_type
AppendOnlyOpLogType append_only_oplog_type
size_t append_only_buff_capacity
size_t per_thread_append_only_buff_pool_size

per partition

int32_t bg_apply_append_oplog_freq
ProcessStorageType process_storage_type
bool no_oplog_replay
template <typename UPDATE>
class petuum::DenseUpdateBatch

This class is provided mainly for convience than complete functionality.

It has no dependency to other classes in petuum ps.

Public Functions

DenseUpdateBatch(int32_t index_st, int32_t num_updates)

Underlying updates are not initialized.

UPDATE &operator[](int32_t index)
void *get_mem()
const void *get_mem_const() const
int32_t get_index_st() const
int32_t get_num_updates() const
struct petuum::HostInfo

Public Functions

HostInfo()
~HostInfo()
HostInfo(int32_t _id, std::string _ip, std::string _port)
HostInfo(const HostInfo &other)
HostInfo &operator=(const HostInfo &other)

Public Members

int32_t id
std::string ip
std::string port

Warning

doxygenclass: Cannot find class “PsApplication” in doxygen xml output for project “Bosen” from directory: xml/

class petuum::PSTableGroup

Public Static Functions

static int Init(const TableGroupConfig &table_group_config, bool table_access)

Can be called only once per process.

Must be called after RegisterRow() to be a barrier for CreateTable(), and “happen before” any other call to TableGroup. The thread that calls Init() is refered to as the init thread. If the init thread needs to access table API (e.g., init thread itself being a worker thread), it should set table_access to true. Init thread is responsible for calling RegisterRow(), CreateTable() and ShutDown(). Calling those functions from other threads is not allowed. Init thread does not need to DeregisterThread() nor RegisterThread().

static void ShutDown()

Init thread need to call ShutDown() after all other app threads have deregistered.

Any other call to TableGroup and Table API must return before calling ShutDown().

template <typename ROW>
static void RegisterRow(int32_t row_type)

Should be called before Init().

Not thread-safe. We strongly recommend to call RegisterRow from init thread to avoid race condition.

static bool CreateTable(int32_t table_id, const ClientTableConfig &table_config)
static void CreateTableDone()

Must be called by Init thread after creating all tables and before any other thread calls RegisterThread().

static void WaitThreadRegister()

Called by Init thread only before it access any table API.

Must be called after CreateTableDone(). If Init thread does not access table API, it makes no difference calling this function.

template <typename UPDATE>
static Table<UPDATE> GetTableOrDie(int32_t table_id)

GetTableOrDie is thread-safe with respect to other calls to GetTableOrDie() Getter, terminate if table is not found.

static int32_t RegisterThread()

A app threads except init thread should register itself before accessing any Table API.

In SSP mode, if a thread invokes RegisterThread with true, its clock will be kept track of, so it should call Clock() properly.

static void DeregisterThread()

A registered thread must deregister itself.

static void Clock()

Advance clock for the application thread.

static void GlobalBarrier()

Called by application threads that access table API (referred to as table threads).

Threads that calls GlobalBarrier must be at the same clock. 1) A table thread may not go beyond the barrier until all table threads have reached the barrier; 2) Table threads that move beyond the barrier are guaranteed to see the updates that other table threads apply to the table.

class petuum::RowAccessor

RowAccessor is a “smart pointer” for ROW: row_accessor.Get() gives a const reference.

We disallow copying of RowAccessor to avoid unnecessary manipulation of reference count.

Inherits from noncopyable

Public Functions

RowAccessor()
~RowAccessor()
template <typename ROW>
const ROW &Get()

The returned reference is guaranteed to be valid only during the lifetime of this RowAccessor.

struct petuum::RowOpLogType

Public Static Attributes

const int32_t kDenseRowOpLog = 0
const int32_t kSparseRowOpLog = 1
const int32_t kSparseVectorRowOpLog = 2
template <typename UPDATE>
class petuum::Table

User table is stores a lightweight pointer to ClientTable.

Public Functions

Table()
Table(AbstractClientTable *system_table)
Table(const Table &table)
Table &operator=(const Table &table)
void GetAsyncForced(int32_t row_id)
void GetAsync(int32_t row_id)
void WaitPendingAsyncGet()
void ThreadGet(int32_t row_id, ThreadRowAccessor *row_accessor)
void ThreadInc(int32_t row_id, int32_t column_id, UPDATE update)
void ThreadBatchInc(int32_t row_id, const UpdateBatch<UPDATE> &update_batch)
void ThreadDenseBatchInc(int32_t row_id, const DenseUpdateBatch<UPDATE> &update_batch)
void FlushThreadCache()
void Get(int32_t row_id, RowAccessor *row_accessor)

row_accessor helps maintain the reference count to prevent premature cache eviction.

template <typename ROW>
const ROW &Get(int32_t row_id, RowAccessor *row_accessor = 0)
void Inc(int32_t row_id, int32_t column_id, UPDATE update)
void BatchInc(int32_t row_id, const UpdateBatch<UPDATE> &update_batch)
void DenseBatchInc(int32_t row_id, const DenseUpdateBatch<UPDATE> &update_batch)
int32_t get_row_type() const
struct petuum::TableGroupConfig

Public Functions

TableGroupConfig()

Public Members

std::string stats_path
int32_t num_comm_channels_per_client

Total number of servers in the system.

Global parameters have to be the same across all processes.

int32_t num_tables

Total number of tables the PS will have.

Each init thread must make num_tables CreateTable() calls. Global parameters have to be the same across all processes.

int32_t num_total_clients

Total number of clients in the system.

Global parameters have to be the same across all processes.

int32_t num_local_app_threads

Number of local applications threads, including init thread.

Local parameters can differ between processes, but have to sum up to global parameters.

std::map<int32_t, HostInfo> host_map

mapping server ID to host info.

int32_t client_id

My client id.

bool aggressive_clock

If set to true, oplog send is triggered on every Clock() call.

If set to false, oplog is only sent if the process clock (representing all app threads) has advanced. Aggressive clock may reduce memory footprint and improve the per-clock convergence rate in the cost of performance. Default is false (suggested).

ConsistencyModel consistency_model
int32_t aggressive_cpu
int32_t server_ring_size

In Async+pushing,.

int32_t snapshot_clock
int32_t resume_clock
std::string snapshot_dir
std::string resume_dir
std::string ooc_path_prefix
UpdateSortPolicy update_sort_policy
long bg_idle_milli

In number of milliseconds.

If the bg thread wakes up and finds there’s no work to do, it goes back to sleep for this much time or until it receives a message.

double bandwidth_mbps

Bandwidth in Megabits per second.

size_t oplog_push_upper_bound_kb

upper bound on update message size in kilobytes

int32_t oplog_push_staleness_tolerance
size_t thread_oplog_batch_size
size_t server_push_row_threshold
long server_idle_milli
long server_row_candidate_factor
struct petuum::TableInfo

TableInfo is shared between client and server.

Public Functions

TableInfo()

Public Members

int32_t table_staleness

table_staleness is used for SSP and ClockVAP.

int32_t row_type

A table can only have one type of row.

The row_type is defined when calling TableGroup::RegisterRow().

size_t row_capacity

row_capacity can mean different thing for different row_type.

For example in vector-backed dense row it is the max number of columns. This parameter is ignored for sparse row.

bool oplog_dense_serialized
int32_t row_oplog_type
size_t dense_row_oplog_capacity
class petuum::ThreadRowAccessor

Inherits from noncopyable

Public Functions

ThreadRowAccessor()
~ThreadRowAccessor()
template <typename ROW>
const ROW &Get()

The returned reference is guaranteed to be valid only during the lifetime of this RowAccessor.

template <typename UPDATE>
class petuum::UpdateBatch

UPDATE can be V = {int, double, ...} when the entry is simple numerical type.

If the entry type is a struct, UPDATE can be id:val pair, like {1:5} which increment field 1 of struct by 5.

Public Functions

UpdateBatch()
UpdateBatch(size_t num_updates)
~UpdateBatch()
UpdateBatch(const UpdateBatch &other)
UpdateBatch &operator=(const UpdateBatch &other)
void Update(int32_t column_id, const UPDATE &update)
void UpdateSet(int32_t idx, int32_t column_id, const UPDATE &update)
const std::vector<int32_t> &GetColIDs() const
const UPDATE *GetUpdates() const
int32_t GetBatchSize() const
class petuum::HighResolutionTimer

This is a simpler implementation of timer to replace boost::high_resolution_timer.

Code based on http://boost.2283326.n4.nabble.com/boost-shared-mutex-performance-td2659061.html

Public Functions

HighResolutionTimer()
void restart()
double elapsed() const

Return
elapsed time (including previous restart>pause time) in seconds.

double elapsed_max() const

Return
estimated maximum value for elapsed()

double elapsed_min() const

Return
minimum value for elapsed()

class petuum::SharedMutex

Read-write mutex, almost equivalent to boost::shared_mutex (but without timed locking).

Benchmark shows that this is about 2x faster than boost::shared_mutex on Ubuntu 12.04.

Usage: { SharedMutex rw_mutex; boost::unique_lock<SharedMutex> lock(rw_mutex); // Do things with exclusive lock } { SharedMutex rw_mutex; boost::shared_lock<SharedMutex> lock(rw_mutex); // Do things with exclusive lock }

Code adapted from http://boost.2283326.n4.nabble.com/boost-shared-mutex-performance-td2659061.html

Inherits from petuum::Lockable

Subclassed by petuum::RecursiveSharedMutex

Public Functions

SharedMutex()
~SharedMutex()
void lock()

Blocks until a lock can be obtained for the current execution agent.

If an exception is thrown, no lock is obtained.

bool try_lock()

Attempts to acquire the lock for the current execution agent without blocking.

If an exception is thrown, no lock is obtained.

Return
true if the lock was acquired, false otherwise

void unlock()

Releases the lock held by the execution agent.

Throws no exceptions. requires: The current execution agent should hold the lock.

void lock_shared()
bool try_lock_shared()
void unlock_shared()
class petuum::RecursiveSharedMutex

Provide recursive lock with write lock counting.

Note the function override hides those in SharedMutex.

Inherits from petuum::SharedMutex

Public Functions

RecursiveSharedMutex()
~RecursiveSharedMutex()
void lock()

Blocks until a lock can be obtained for the current execution agent.

If an exception is thrown, no lock is obtained.

bool try_lock()

Attempts to acquire the lock for the current execution agent without blocking.

If an exception is thrown, no lock is obtained.

Return
true if the lock was acquired, false otherwise

void unlock()

Releases the lock held by the execution agent.

Throws no exceptions. requires: The current execution agent should hold the lock.

void lock_shared()
bool try_lock_shared()
void unlock_shared()
template <typename MUTEX = std::mutex>
class petuum::Unlocker

It takes an acquired lock and unlock it in destructor.

Inherits from noncopyable

Public Functions

Unlocker()
~Unlocker()
void SetLock(MUTEX *lock)

lock must have been locked already.

It does not take ownership.

void Release()

Release the lock.

MUTEX *GetAndRelease()
class petuum::Lockable

The Lockable concept (implemented as interface/abstract class) describes the characteristics of types that provide exclusive blocking semantics for execution agents (i.e.

threads).

Subclassed by petuum::SharedMutex, petuum::SpinMutex

Public Functions

virtual void lock() = 0

Blocks until a lock can be obtained for the current execution agent.

If an exception is thrown, no lock is obtained.

virtual void unlock() = 0

Releases the lock held by the execution agent.

Throws no exceptions. requires: The current execution agent should hold the lock.

virtual bool try_lock() = 0

Attempts to acquire the lock for the current execution agent without blocking.

If an exception is thrown, no lock is obtained.

Return
true if the lock was acquired, false otherwise

class petuum::MemBlock

A thin layer to manage a chunk of contiguous memory which may be allocated outside (via MemAlloc) or inside (via Alloc) the class.

This class is meant to be simple (cheap) so it does not chech any assumption that a function assumes when that function is invoked. If the assumption is not satisfied the behavior of the current and future operation on MemBlock is undefined.

Inherits from noncopyable

Public Functions

MemBlock()
~MemBlock()
void Reset(void *mem)

Reset the MemBlock object to manage a particular memory chunk of certain size.

If there is a previous memory chunk managed by this object, it is freed. Once a memory chunk is give to MemBlock, it cannot be freed externally.

uint8_t *Release()

Release the control over the memory chunk without destroying it.

uint8_t *get_mem()

Get a pointer to access to the underlying memory managed by this MemBlock.

void Alloc(int32_t size)

Allocate a chunk of memory based on the size information.

Must be invoked when there’s no memory managed by this MemBlock object yet.

Public Static Functions

static uint8_t *MemAlloc(int32_t nbytes)
static void MemFree(uint8_t *mem)
template <typename T>
class petuum::MPMCQueue

MPMCQueue is a multi-producer-multi-consumer bounded buffer.

Inherits from noncopyable

Public Functions

MPMCQueue(size_t capacity)
~MPMCQueue()
size_t get_size() const
void Push(const T &value)
bool Pop(T *value)
template <typename T>
class petuum::MTQueue

Wrap around std::queue and provide thread safety (MT = multi-threaded).

Public Functions

MTQueue()
int pop(T *val)

Return 0 if it’s empty, 1 if not.

val is unset if returning 0. Note the different semantics than std::queue::pop.

void push(const T &val)
class petuum::RecordBuff

A buffer that allows appending records to it.

Inherits from noncopyable

Public Functions

RecordBuff()
RecordBuff(void *mem, size_t size)
~RecordBuff()
RecordBuff(RecordBuff &&other)
void *ResetMem(void *mem, size_t size)

does not take ownership of the memory

void ResetOffset()
bool Append(int32_t record_id, const void *record, size_t record_size)
size_t GetMemUsedSize()
int32_t *GetMemPtrInt32()
void PrintInfo() const
class petuum::SparseVector

Fixed size sparse vector.

Inherits from noncopyable

Public Functions

SparseVector(size_t capacity, size_t value_size)
~SparseVector()
size_t get_capacity() const
size_t get_size() const
uint8_t *GetValPtr(int32_t key)
const uint8_t *GetValPtrConst(int32_t key) const
uint8_t *GetByIdx(int32_t index, int32_t *key)
const uint8_t *GetByIdxConst(int32_t index, int32_t *key) const
void Copy(const SparseVector &src)
void Reset()
void Compact(int32_t index)
uint8_t *get_data_ptr()
template <typename K, typename MUTEX = std::mutex, typename HASH = std::hash<K>>
class petuum::StripedLock

StripedLock does not support scoped lock.

MUTEX must implement Lockable interface.

Inherits from noncopyable

Public Functions

StripedLock()

Determine number of locks based on number of cores.

StripedLock(int lock_pool_size)

Initialize with number of locks in the pool.

void Lock(K idx)

Lock index idx.

void Lock(K idx, Unlocker<MUTEX> *unlocker)

Lock index idx, and let unlocker unlock it later on.

bool TryLock(K idx)

Lock index idx.

bool TryLock(K idx, Unlocker<MUTEX> *unlocker)

Lock index idx.

void Unlock(K idx)

Unlock.

class petuum::NanoTimer

Public Functions

NanoTimer()
int Start(int32_t _interval, TimerHandler _handler, void *_handler_argu)
int Stop()
class petuum::VectorClock

VectorClock manages a vector of clocks and maintains the minimum of these clocks.

This class is single thread (ST) only.

Subclassed by petuum::VectorClockMT

Public Functions

VectorClock()
VectorClock(const std::vector<int32_t> &ids)

Initialize client_ids.size() client clocks with all of them at time 0.

void AddClock(int32_t id, int32_t clock = 0)

Add a clock in vector clock with initial timestampe.

id must be unique.

Return
0 on success, negatives on error (e.g., duplicated id).

int32_t Tick(int32_t id)

Increment client’s clock.

Accordingly update slowest_client_clock_.

Return
the minimum clock if client_id is the slowest thread; 0 if not; negatives for error;

int32_t TickUntil(int32_t id, int32_t clock)
int32_t get_clock(int32_t id) const

Getters.

int32_t get_min_clock() const
class petuum::VectorClockMT

VectorClock is a thread-safe extension of VectorClockST.

Inherits from petuum::VectorClock, noncopyable

Public Functions

VectorClockMT()
VectorClockMT(const std::vector<int32_t> &ids)
void AddClock(int32_t id, int32_t clock = 0)

Override VectorClock.

int32_t Tick(int32_t id)

Increment client’s clock.

Accordingly update slowest_client_clock_.

Return
the minimum clock if client_id is the slowest thread; 0 if not; negatives for error;

int32_t TickUntil(int32_t id, int32_t clock)
int32_t get_clock(int32_t id) const

Accessor to a particular clock.

int32_t get_min_clock() const
void petuum::GetHostInfos(std::string server_file, std::map<int32_t, HostInfo> *host_map)

Read in a file containing list of servers.

‘server_file’ need to have the following line structure:

<id> <ip> <port> (tab in as deliminator) 1 128.0.1.1 80

Note that the first line of the file will be considered as name node.

Indices and tables