bamboo.dht
Class Dht

java.lang.Object
  extended by bamboo.util.StandardStage
      extended by bamboo.dht.Dht
All Implemented Interfaces:
StorageManager.StorageMonitor, EventHandlerIF, SingleThreadedEventHandlerIF

public class Dht
extends StandardStage
implements SingleThreadedEventHandlerIF, StorageManager.StorageMonitor

Distributed hash table layer for Bamboo.

Version:
$Id: Dht.java,v 1.52 2005/03/02 02:28:26 srhea Exp $
Author:
Sean C. Rhea

Nested Class Summary
static class Dht.DiskSizeMsg
           
static class Dht.ForwardedGetReqMsg
           
static class Dht.ForwardedGetState
           
static class Dht.ForwardThroughLeafSetReq
           
static class Dht.GetReq
           
static class Dht.GetReqPayload
           
protected static class Dht.GetReqState
           
static class Dht.GetResp
           
static class Dht.GetRespMsg
           
static interface Dht.NetMsgResCb
           
static class Dht.PutReq
           
static class Dht.PutReqPayload
           
static class Dht.PutResp
           
static class Dht.PutRespMsg
           
 
Field Summary
protected  Map active_gets
           
protected  Map active_puts
           
protected static long app_id
           
protected  double[] avail_tokens
          One token per byte.
protected  Map client_to_usage
           
protected  double[] disk_sizes
          The sizes of our virtual disks, in bytes.
protected  double[] fill_rates
          Fill rates in bytes per millisecond, only set by compute_rates ().
 ASyncCore.TimerCB forwarded_get_timeout
           
protected static int FORWARDED_GET_TIMEOUT
           
protected  Map forwarded_gets
           
protected  Rpc.RequestCB fwd_thru_ls_req_cb
           
protected  Rpc.ResponseCB fwd_thru_ls_resp_cb
           
protected  ASyncCore.TimerCB get_retry_cb
           
protected static long GET_TIMEOUT
           
protected  boolean initialized
           
protected  boolean iterative_routing
           
protected  long[] last_fill_time
          The last time we added any tokens to each bucket.
protected  int leaf_set_size
           
protected  Map[] ls_disk_sizes
          Keep track of the disk sizes of our leaf set, and scale put rates based on them.
protected  BigInteger MAX_GUID
           
protected  double[] max_tokens
          Capacity limits for the token buckets.
static int MAX_TTL_SEC
           
protected static BigInteger MIN_GUID
           
protected  int min_replica_count
           
protected static BigInteger NEG_ONE
           
protected  long next_get_seq
           
protected  long next_put_seq
           
protected  BambooNeighborInfo[] preds
           
protected  ASyncCore.TimerCB put_retry_cb
           
protected static long PUT_TIMEOUT
           
protected  Random rand
           
protected  BigInteger resp_high
           
protected  BigInteger resp_low
           
protected  Rpc rpc
           
protected  LinkedList slop
           
protected  int[] slop_cap
           
protected  int[] slop_size
           
protected  ASyncCore.TimerCB slop_to_disk_cb
           
protected  Map[] slop_usage
           
protected  BambooNeighborInfo[] succs
           
protected  ASyncCore.TimerCB swap_disk_sizes_cb
           
protected  long total_usage
           
protected  int[] ttl_sec_ranges
           
protected  LinkedList wait_q
           
 
Fields inherited from class bamboo.util.StandardStage
acore, classifier, DEBUG, event_types, inb_msg_types, logger, my_node_id, my_sink, outb_msg_types, sim_running
 
Constructor Summary
Dht()
           
 
Method Summary
protected  void add_to_disk(int size, int ttl_sec)
           
protected  void add_to_slop(BambooRouteDeliver msg, Dht.PutReqPayload payload)
           
protected  boolean below_fair_share(InetAddress client, int size, int ttl_sec)
           
protected  void compute_rates()
           
protected  double disk_avail_rate(int ttl_sec)
           
protected  boolean disk_space_avail(int size, int ttl_sec)
           
protected  void forward_thru_ls(Object[] pair)
           
 void forwarded_get_advance(Dht.ForwardedGetState state)
           
protected  void handle_disk_size_msg(Dht.DiskSizeMsg msg)
           
protected  void handle_forwarded_get_req_msg(Dht.ForwardedGetReqMsg msg)
           
protected  void handle_get_by_guid_resp(StorageManager.GetByGuidResp resp)
          Returns all values whose keys are greater than the given placemark, if any.
protected  void handle_get_req_payload(BambooRouteDeliver msg, Dht.GetReqPayload payload)
           
protected  void handle_get_req(Dht.GetReq req)
           
protected  void handle_get_resp_msg(Dht.GetRespMsg msg)
           
protected  void handle_leaf_set_changed(BambooLeafSetChanged msg)
           
protected  void handle_put_or_remove_resp(PutOrRemoveResp ack)
           
protected  void handle_put_req_payload(BambooRouteDeliver msg, Dht.PutReqPayload payload)
           
protected  void handle_put_req(Dht.PutReq req)
           
protected  void handle_put_resp_msg(Dht.PutRespMsg msg)
           
 void handleEvent(QueueElementIF item)
           
 void init(ConfigDataIF config)
           
protected  long next_get_seq()
           
protected  long next_put_seq()
           
protected  void return_to_client(ostore.network.NetworkMessage msg)
           
protected  void send_disk_size_msg(ostore.util.NodeId peer)
           
protected  void set_forwarded_get_timeout()
           
protected  boolean slop_empty(int ttl_sec)
           
protected  boolean slop_space_avail(InetAddress client, int size, int ttl_sec)
           
protected  long slop_usage(int disk, InetAddress client)
           
protected  void start_get_req(Dht.GetReqState state)
           
 void storage_changed(boolean added, InetAddress client_id, long size)
           
protected  int storage_size(Dht.PutReqPayload payload)
           
protected  int ttl_to_vdisk(int ttl_sec)
           
 
Methods inherited from class bamboo.util.StandardStage
BUG, BUG, BUG, config_get_boolean, config_get_double, config_get_int, config_get_string, configGetInt, destroy, dispatch, enqueue, handleEvents, lookup_stage, now_ms, timer_ms
 
Methods inherited from class java.lang.Object
clone, equals, finalize, getClass, hashCode, notify, notifyAll, toString, wait, wait, wait
 

Field Detail

rpc

protected Rpc rpc

initialized

protected boolean initialized

wait_q

protected LinkedList wait_q

app_id

protected static final long app_id

iterative_routing

protected boolean iterative_routing

rand

protected Random rand

next_put_seq

protected long next_put_seq

next_get_seq

protected long next_get_seq

active_puts

protected Map active_puts

active_gets

protected Map active_gets

forwarded_gets

protected Map forwarded_gets

MAX_TTL_SEC

public static int MAX_TTL_SEC

ttl_sec_ranges

protected int[] ttl_sec_ranges

client_to_usage

protected Map client_to_usage

total_usage

protected long total_usage

avail_tokens

protected double[] avail_tokens
One token per byte. Filled according to fill_rates, below.


max_tokens

protected double[] max_tokens
Capacity limits for the token buckets.


fill_rates

protected double[] fill_rates
Fill rates in bytes per millisecond, only set by compute_rates ().


disk_sizes

protected double[] disk_sizes
The sizes of our virtual disks, in bytes. The defaults work out to 2 kB per second per node with 8 replicas (leaf_set_size = 4).


ls_disk_sizes

protected Map[] ls_disk_sizes
Keep track of the disk sizes of our leaf set, and scale put rates based on them.


last_fill_time

protected long[] last_fill_time
The last time we added any tokens to each bucket.


min_replica_count

protected int min_replica_count

preds

protected BambooNeighborInfo[] preds

succs

protected BambooNeighborInfo[] succs

leaf_set_size

protected int leaf_set_size

resp_low

protected BigInteger resp_low

resp_high

protected BigInteger resp_high

MIN_GUID

protected static final BigInteger MIN_GUID

MAX_GUID

protected BigInteger MAX_GUID

NEG_ONE

protected static final BigInteger NEG_ONE

swap_disk_sizes_cb

protected ASyncCore.TimerCB swap_disk_sizes_cb

slop

protected LinkedList slop

slop_size

protected int[] slop_size

slop_cap

protected int[] slop_cap

slop_usage

protected Map[] slop_usage

slop_to_disk_cb

protected ASyncCore.TimerCB slop_to_disk_cb

PUT_TIMEOUT

protected static final long PUT_TIMEOUT
See Also:
Constant Field Values

put_retry_cb

protected ASyncCore.TimerCB put_retry_cb

GET_TIMEOUT

protected static final long GET_TIMEOUT
See Also:
Constant Field Values

get_retry_cb

protected ASyncCore.TimerCB get_retry_cb

FORWARDED_GET_TIMEOUT

protected static final int FORWARDED_GET_TIMEOUT
See Also:
Constant Field Values

forwarded_get_timeout

public ASyncCore.TimerCB forwarded_get_timeout

fwd_thru_ls_resp_cb

protected Rpc.ResponseCB fwd_thru_ls_resp_cb

fwd_thru_ls_req_cb

protected Rpc.RequestCB fwd_thru_ls_req_cb
Constructor Detail

Dht

public Dht()
    throws Exception
Throws:
Exception
Method Detail

next_put_seq

protected long next_put_seq()

next_get_seq

protected long next_get_seq()

ttl_to_vdisk

protected int ttl_to_vdisk(int ttl_sec)

storage_changed

public void storage_changed(boolean added,
                            InetAddress client_id,
                            long size)
Specified by:
storage_changed in interface StorageManager.StorageMonitor

below_fair_share

protected boolean below_fair_share(InetAddress client,
                                   int size,
                                   int ttl_sec)

compute_rates

protected void compute_rates()

handle_leaf_set_changed

protected void handle_leaf_set_changed(BambooLeafSetChanged msg)

send_disk_size_msg

protected void send_disk_size_msg(ostore.util.NodeId peer)

handle_disk_size_msg

protected void handle_disk_size_msg(Dht.DiskSizeMsg msg)

disk_space_avail

protected boolean disk_space_avail(int size,
                                   int ttl_sec)

add_to_disk

protected void add_to_disk(int size,
                           int ttl_sec)

disk_avail_rate

protected double disk_avail_rate(int ttl_sec)

storage_size

protected int storage_size(Dht.PutReqPayload payload)

slop_empty

protected boolean slop_empty(int ttl_sec)

slop_space_avail

protected boolean slop_space_avail(InetAddress client,
                                   int size,
                                   int ttl_sec)

add_to_slop

protected void add_to_slop(BambooRouteDeliver msg,
                           Dht.PutReqPayload payload)

slop_usage

protected long slop_usage(int disk,
                          InetAddress client)

init

public void init(ConfigDataIF config)
          throws Exception
Specified by:
init in interface EventHandlerIF
Overrides:
init in class StandardStage
Throws:
Exception

handleEvent

public void handleEvent(QueueElementIF item)
Specified by:
handleEvent in interface EventHandlerIF
Overrides:
handleEvent in class StandardStage

handle_put_req

protected void handle_put_req(Dht.PutReq req)

handle_put_resp_msg

protected void handle_put_resp_msg(Dht.PutRespMsg msg)

handle_get_req

protected void handle_get_req(Dht.GetReq req)

handle_get_resp_msg

protected void handle_get_resp_msg(Dht.GetRespMsg msg)

handle_put_req_payload

protected void handle_put_req_payload(BambooRouteDeliver msg,
                                      Dht.PutReqPayload payload)

handle_get_req_payload

protected void handle_get_req_payload(BambooRouteDeliver msg,
                                      Dht.GetReqPayload payload)

handle_forwarded_get_req_msg

protected void handle_forwarded_get_req_msg(Dht.ForwardedGetReqMsg msg)

start_get_req

protected void start_get_req(Dht.GetReqState state)

handle_put_or_remove_resp

protected void handle_put_or_remove_resp(PutOrRemoveResp ack)

handle_get_by_guid_resp

protected void handle_get_by_guid_resp(StorageManager.GetByGuidResp resp)
Returns all values whose keys are greater than the given placemark, if any. Returns a placemark equal to the largest key of the values returned, or all zeros if no values are returned. All zeros is appropriate as a lower open limit on placemarks since time_usec cannot be zero in any real key.

Using the standard StorageManager.Key.compareTo function, instead of a Comparator that compares guids first, works here because all values for a given get have the same guid.


set_forwarded_get_timeout

protected void set_forwarded_get_timeout()

forwarded_get_advance

public void forwarded_get_advance(Dht.ForwardedGetState state)

return_to_client

protected void return_to_client(ostore.network.NetworkMessage msg)

forward_thru_ls

protected void forward_thru_ls(Object[] pair)